tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;

impl ChunkStorage {
    pub(in crate::engine::storage_engine) fn build_visibility_state() -> VisibilityState {
        VisibilityState {
            tombstones: RwLock::new(HashMap::new()),
            materialized_series: RwLock::new(BTreeSet::new()),
            series_visibility_summaries: RwLock::new(HashMap::new()),
            series_visible_max_timestamps: RwLock::new(HashMap::new()),
            series_visible_bounded_max_timestamps: RwLock::new(HashMap::new()),
            visibility_state_generation: AtomicU64::new(0),
            live_series_pruning_generation: AtomicU64::new(0),
            max_observed_timestamp: AtomicI64::new(i64::MIN),
            max_bounded_observed_timestamp: AtomicI64::new(i64::MIN),
            recency_state_lock: Mutex::new(()),
            flush_visibility_lock: RwLock::new(()),
        }
    }

    pub(in crate::engine::storage_engine) fn visibility_state_generation(&self) -> u64 {
        self.visibility
            .visibility_state_generation
            .load(Ordering::Acquire)
    }

    pub(in crate::engine::storage_engine) fn bump_visibility_state_generation(&self) {
        self.visibility
            .visibility_state_generation
            .fetch_add(1, Ordering::AcqRel);
    }

    pub(in crate::engine::storage_engine) fn visibility_read_fence(
        &self,
    ) -> RwLockReadGuard<'_, ()> {
        self.visibility.flush_visibility_lock.read()
    }

    pub(in crate::engine::storage_engine) fn visibility_write_fence(
        &self,
    ) -> RwLockWriteGuard<'_, ()> {
        self.visibility.flush_visibility_lock.write()
    }

    pub(in crate::engine::storage_engine) fn with_visibility_write_stage<R>(
        &self,
        f: impl FnOnce() -> Result<R>,
    ) -> Result<R> {
        let _visibility_guard = self.visibility_write_fence();
        f()
    }

    fn update_atomic_max_timestamp(target: &AtomicI64, ts: i64) {
        let mut current = target.load(Ordering::Acquire);
        while ts > current {
            match target.compare_exchange_weak(current, ts, Ordering::AcqRel, Ordering::Acquire) {
                Ok(_) => break,
                Err(actual) => current = actual,
            }
        }
    }

    pub(in crate::engine::storage_engine) fn update_max_observed_timestamp(&self, ts: i64) {
        Self::update_atomic_max_timestamp(&self.visibility.max_observed_timestamp, ts);
    }

    pub(in crate::engine::storage_engine) fn max_observed_timestamp(&self) -> Option<i64> {
        let max_observed = self
            .visibility
            .max_observed_timestamp
            .load(Ordering::Acquire);
        (max_observed != i64::MIN).then_some(max_observed)
    }

    pub(in crate::engine::storage_engine) fn bounded_recency_reference_timestamp(
        &self,
    ) -> Option<i64> {
        let max_bounded = self
            .visibility
            .max_bounded_observed_timestamp
            .load(Ordering::Acquire);
        (max_bounded != i64::MIN).then_some(max_bounded)
    }

    pub(in crate::engine::storage_engine) fn retention_recency_reference_timestamp(
        &self,
    ) -> Option<i64> {
        let max_observed = self
            .visibility
            .max_observed_timestamp
            .load(Ordering::Acquire);
        let bounded_reference = self.bounded_recency_reference_timestamp();
        if max_observed == i64::MIN && bounded_reference.is_none() {
            return None;
        }

        Some(
            bounded_reference
                .map(|reference| reference.max(self.current_timestamp_units()))
                .unwrap_or_else(|| self.current_timestamp_units()),
        )
    }
}