tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;

pub(super) fn add_included_memory_component_bytes(
    accounting_enabled: bool,
    component: &AtomicU64,
    shared_used_bytes: &AtomicU64,
    used_bytes: &AtomicU64,
    bytes: usize,
) {
    if !accounting_enabled || bytes == 0 {
        return;
    }

    let increment = saturating_u64_from_usize(bytes);
    component.fetch_add(increment, Ordering::AcqRel);
    shared_used_bytes.fetch_add(increment, Ordering::AcqRel);
    used_bytes.fetch_add(increment, Ordering::AcqRel);
}

pub(super) fn sub_included_memory_component_bytes(
    accounting_enabled: bool,
    component: &AtomicU64,
    shared_used_bytes: &AtomicU64,
    used_bytes: &AtomicU64,
    bytes: usize,
) {
    if !accounting_enabled || bytes == 0 {
        return;
    }

    let decrement = saturating_u64_from_usize(bytes);
    component.fetch_sub(decrement, Ordering::AcqRel);
    shared_used_bytes.fetch_sub(decrement, Ordering::AcqRel);
    used_bytes.fetch_sub(decrement, Ordering::AcqRel);
}

pub(super) fn reset_included_memory_component_bytes(
    accounting_enabled: bool,
    component: &AtomicU64,
    shared_used_bytes: &AtomicU64,
    used_bytes: &AtomicU64,
    bytes: usize,
) {
    if !accounting_enabled {
        return;
    }

    let next = saturating_u64_from_usize(bytes);
    let previous = component.swap(next, Ordering::AcqRel);
    if next >= previous {
        let delta = next.saturating_sub(previous);
        shared_used_bytes.fetch_add(delta, Ordering::AcqRel);
        used_bytes.fetch_add(delta, Ordering::AcqRel);
    } else {
        let delta = previous.saturating_sub(next);
        shared_used_bytes.fetch_sub(delta, Ordering::AcqRel);
        used_bytes.fetch_sub(delta, Ordering::AcqRel);
    }
}

pub(super) fn with_included_memory_delta<T, R>(
    accounting_enabled: bool,
    component: &AtomicU64,
    shared_used_bytes: &AtomicU64,
    used_bytes: &AtomicU64,
    state: &mut T,
    measure: impl Fn(&T) -> usize,
    mutate: impl FnOnce(&mut T) -> R,
) -> R {
    if !accounting_enabled {
        return mutate(state);
    }

    let before = measure(state);
    let result = mutate(state);
    let after = measure(state);
    if after >= before {
        add_included_memory_component_bytes(
            true,
            component,
            shared_used_bytes,
            used_bytes,
            after.saturating_sub(before),
        );
    } else {
        sub_included_memory_component_bytes(
            true,
            component,
            shared_used_bytes,
            used_bytes,
            before.saturating_sub(after),
        );
    }
    result
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct RegistryMemoryContext<'a> {
    pub(in crate::engine::storage_engine) accounting_enabled: bool,
    pub(in crate::engine::storage_engine) registry: &'a RwLock<SeriesRegistry>,
    pub(in crate::engine::storage_engine) registry_used_bytes: &'a AtomicU64,
    pub(in crate::engine::storage_engine) shared_used_bytes: &'a AtomicU64,
    pub(in crate::engine::storage_engine) used_bytes: &'a AtomicU64,
}

impl<'a> RegistryMemoryContext<'a> {
    pub(in crate::engine::storage_engine) fn sync_registry_memory_usage(self) {
        let registry_used = self.registry.read().memory_usage_bytes();
        reset_included_memory_component_bytes(
            self.accounting_enabled,
            self.registry_used_bytes,
            self.shared_used_bytes,
            self.used_bytes,
            registry_used,
        );
    }
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct RegistryBookkeepingContext<'a> {
    pub(in crate::engine::storage_engine) accounting_enabled: bool,
    pub(in crate::engine::storage_engine) pending_series_ids: &'a RwLock<BTreeSet<SeriesId>>,
    pub(in crate::engine::storage_engine) metadata_used_bytes: &'a AtomicU64,
    pub(in crate::engine::storage_engine) shared_used_bytes: &'a AtomicU64,
    pub(in crate::engine::storage_engine) used_bytes: &'a AtomicU64,
}

impl<'a> RegistryBookkeepingContext<'a> {
    pub(in crate::engine::storage_engine) fn mark_series_pending<I>(self, series_ids: I)
    where
        I: IntoIterator<Item = SeriesId>,
    {
        let mut series_ids = series_ids.into_iter().collect::<Vec<_>>();
        series_ids.sort_unstable();
        series_ids.dedup();
        if series_ids.is_empty() {
            return;
        }

        let mut pending = self.pending_series_ids.write();
        with_included_memory_delta(
            self.accounting_enabled,
            self.metadata_used_bytes,
            self.shared_used_bytes,
            self.used_bytes,
            &mut pending,
            |pending| ChunkStorage::btree_set_series_id_memory_usage_bytes(pending),
            |pending| {
                for series_id in series_ids {
                    pending.insert(series_id);
                }
            },
        );
    }
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct ShardMemoryAccountingContext<'a> {
    pub(in crate::engine::storage_engine) accounting_enabled: bool,
    pub(in crate::engine::storage_engine) used_bytes_by_shard:
        &'a [AtomicU64; IN_MEMORY_SHARD_COUNT],
    pub(in crate::engine::storage_engine) used_bytes: &'a AtomicU64,
}

impl<'a> ShardMemoryAccountingContext<'a> {
    pub(super) fn used_bytes_value(self) -> usize {
        self.used_bytes
            .load(Ordering::Acquire)
            .min(usize::MAX as u64) as usize
    }

    pub(in crate::engine::storage_engine) fn account_memory_delta(
        self,
        shard_idx: usize,
        delta: MemoryDeltaBytes,
    ) {
        if !self.accounting_enabled {
            return;
        }

        if delta.added_bytes >= delta.removed_bytes {
            let increment =
                saturating_u64_from_usize(delta.added_bytes.saturating_sub(delta.removed_bytes));
            self.used_bytes_by_shard[shard_idx].fetch_add(increment, Ordering::AcqRel);
            self.used_bytes.fetch_add(increment, Ordering::AcqRel);
        } else {
            let decrement =
                saturating_u64_from_usize(delta.removed_bytes.saturating_sub(delta.added_bytes));
            self.used_bytes_by_shard[shard_idx].fetch_sub(decrement, Ordering::AcqRel);
            self.used_bytes.fetch_sub(decrement, Ordering::AcqRel);
        }
    }
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct PersistedSealedBudgetContext<'a> {
    pub(in crate::engine::storage_engine) memory: ShardMemoryAccountingContext<'a>,
    pub(in crate::engine::storage_engine) backpressure_lock: &'a Mutex<()>,
    pub(in crate::engine::storage_engine) has_persisted_lanes: bool,
    pub(in crate::engine::storage_engine) persisted_chunk_watermarks:
        &'a RwLock<HashMap<SeriesId, u64>>,
    pub(in crate::engine::storage_engine) persisted_index: &'a RwLock<PersistedIndexState>,
    pub(in crate::engine::storage_engine) sealed_chunks:
        &'a [SealedChunkShard; IN_MEMORY_SHARD_COUNT],
    pub(in crate::engine::storage_engine) flush_metrics:
        &'a super::super::metrics::FlushObservabilityCounters,
}

impl<'a> PersistedSealedBudgetContext<'a> {
    fn sealed_chunk_is_present_in_persisted_chunks(
        persisted_chunks: Option<&[PersistedChunkRef]>,
        key: SealedChunkKey,
        chunk: &Chunk,
    ) -> bool {
        persisted_chunks.is_some_and(|persisted_chunks| {
            persisted_chunks.iter().any(|chunk_ref| {
                chunk_ref.min_ts == key.min_ts
                    && chunk_ref.max_ts == key.max_ts
                    && chunk_ref.point_count == key.point_count
                    && chunk_ref.lane == chunk.header.lane
                    && chunk_ref.ts_codec == chunk.header.ts_codec
                    && chunk_ref.value_codec == chunk.header.value_codec
            })
        })
    }

    fn find_oldest_evictable_sealed_chunk(self) -> Option<(usize, SeriesId, SealedChunkKey)> {
        let persisted = self.persisted_chunk_watermarks.read();
        let persisted_index = self.persisted_index.read();
        let mut oldest = None;

        for (shard_idx, shard) in self.sealed_chunks.iter().enumerate() {
            let sealed = shard.read();
            for (series_id, chunks) in sealed.iter() {
                let persisted_sequence = persisted.get(series_id).copied().unwrap_or(0);
                let persisted_chunks = persisted_index
                    .chunk_refs
                    .get(series_id)
                    .map(|chunks| chunks.as_slice());
                for (key, chunk) in chunks {
                    if key.sequence > persisted_sequence
                        || !Self::sealed_chunk_is_present_in_persisted_chunks(
                            persisted_chunks,
                            *key,
                            chunk,
                        )
                    {
                        continue;
                    }
                    if oldest
                        .map(|(_, _, current): (usize, SeriesId, SealedChunkKey)| {
                            key.sequence < current.sequence
                        })
                        .unwrap_or(true)
                    {
                        oldest = Some((shard_idx, *series_id, *key));
                    }
                }
            }
        }

        oldest
    }

    fn evict_oldest_persisted_sealed_chunk(self) -> bool {
        let Some((shard_idx, series_id, key)) = self.find_oldest_evictable_sealed_chunk() else {
            return false;
        };

        let mut sealed = self.sealed_chunks[shard_idx].write();
        let Some(chunks) = sealed.get_mut(&series_id) else {
            return false;
        };
        let removed_chunk = chunks.remove(&key);
        if chunks.is_empty() {
            sealed.remove(&series_id);
        }
        let removed = removed_chunk.is_some();
        if let Some(chunk) = removed_chunk.as_ref() {
            self.memory.account_memory_delta(
                shard_idx,
                MemoryDeltaBytes::from_totals(0, ChunkStorage::chunk_memory_usage_bytes(chunk)),
            );
        }
        if removed {
            self.flush_metrics
                .evicted_sealed_chunks_total
                .fetch_add(1, Ordering::Relaxed);
        }
        removed
    }

    pub(in crate::engine::storage_engine) fn evict_persisted_sealed_chunks_to_budget(
        self,
        budget: usize,
    ) {
        if budget == usize::MAX
            || self.memory.used_bytes_value() <= budget
            || !self.has_persisted_lanes
        {
            return;
        }

        let _backpressure_guard = self.backpressure_lock.lock();
        while self.memory.used_bytes_value() > budget {
            if !self.evict_oldest_persisted_sealed_chunk() {
                break;
            }
        }
    }
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct SealedChunkPublishContext<'a> {
    pub(in crate::engine::storage_engine) sealed_chunks:
        &'a [SealedChunkShard; IN_MEMORY_SHARD_COUNT],
    pub(in crate::engine::storage_engine) next_chunk_sequence: &'a AtomicU64,
    pub(in crate::engine::storage_engine) memory: ShardMemoryAccountingContext<'a>,
    #[cfg(test)]
    pub(in crate::engine::storage_engine) pre_publish_hook:
        &'a RwLock<Option<Arc<IngestCommitHook>>>,
}

impl<'a> SealedChunkPublishContext<'a> {
    pub(in crate::engine::storage_engine) fn append_finalized_chunks_to_sealed_shard<I>(
        self,
        shard_idx: usize,
        finalized: I,
    ) where
        I: IntoIterator<Item = (SeriesId, Chunk)>,
    {
        let account_memory = self.memory.accounting_enabled;
        #[cfg(test)]
        if let Some(hook) = self.pre_publish_hook.read().clone() {
            hook();
        }
        let mut sealed = self.sealed_chunks[shard_idx].write();
        let mut memory_delta = MemoryDeltaBytes::default();
        for (series_id, chunk) in finalized {
            let chunk = chunk.into_sealed_storage();
            let chunk_bytes = if account_memory {
                ChunkStorage::chunk_memory_usage_bytes(&chunk)
            } else {
                0
            };
            let sequence = self.next_chunk_sequence.fetch_add(1, Ordering::SeqCst);
            let key = SealedChunkKey::from_chunk(&chunk, sequence);
            let replaced = sealed
                .entry(series_id)
                .or_default()
                .insert(key, Arc::new(chunk));
            if account_memory {
                memory_delta.record_replacement(chunk_bytes, replaced.as_ref(), |chunk| {
                    ChunkStorage::chunk_memory_usage_bytes(chunk)
                });
            }
        }
        drop(sealed);
        self.memory.account_memory_delta(shard_idx, memory_delta);
    }
}