tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::*;
use std::borrow::Cow;
use std::time::Instant;

#[derive(Clone, Copy, Debug, Default)]
pub(in crate::engine::storage_engine) struct MemoryDeltaBytes {
    pub(in crate::engine::storage_engine) added_bytes: usize,
    pub(in crate::engine::storage_engine) removed_bytes: usize,
}

impl MemoryDeltaBytes {
    pub(in crate::engine::storage_engine) fn from_totals(
        added_bytes: usize,
        removed_bytes: usize,
    ) -> Self {
        Self {
            added_bytes,
            removed_bytes,
        }
    }

    pub(in crate::engine::storage_engine) fn between(before: usize, after: usize) -> Self {
        let mut delta = Self::default();
        delta.record_change(before, after);
        delta
    }

    pub(in crate::engine::storage_engine) fn record_change(&mut self, before: usize, after: usize) {
        if after >= before {
            self.record_addition(after.saturating_sub(before));
        } else {
            self.record_removal(before.saturating_sub(after));
        }
    }

    pub(in crate::engine::storage_engine) fn record_addition(&mut self, bytes: usize) {
        self.added_bytes = self.added_bytes.saturating_add(bytes);
    }

    pub(in crate::engine::storage_engine) fn record_removal(&mut self, bytes: usize) {
        self.removed_bytes = self.removed_bytes.saturating_add(bytes);
    }

    pub(in crate::engine::storage_engine) fn record_replacement<T>(
        &mut self,
        added_bytes: usize,
        replaced: Option<&T>,
        measure: impl Fn(&T) -> usize,
    ) {
        self.record_addition(added_bytes);
        if let Some(replaced) = replaced {
            self.record_removal(measure(replaced));
        }
    }
}

pub(in crate::engine::storage_engine) fn saturating_u64_from_usize(value: usize) -> u64 {
    value.min(u64::MAX as usize) as u64
}

pub(in crate::engine::storage_engine) fn elapsed_nanos_u64(started: Instant) -> u64 {
    started.elapsed().as_nanos().min(u64::MAX as u128) as u64
}

pub(in crate::engine::storage_engine) fn current_unix_millis_u64() -> u64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|duration| duration.as_millis().min(u64::MAX as u128) as u64)
        .unwrap_or(0)
}

pub(in crate::engine::storage_engine) fn duration_to_timestamp_units(
    duration: Duration,
    precision: TimestampPrecision,
) -> i64 {
    match precision {
        TimestampPrecision::Seconds => i64::try_from(duration.as_secs()).unwrap_or(i64::MAX),
        TimestampPrecision::Milliseconds => i64::try_from(duration.as_millis()).unwrap_or(i64::MAX),
        TimestampPrecision::Microseconds => i64::try_from(duration.as_micros()).unwrap_or(i64::MAX),
        TimestampPrecision::Nanoseconds => i64::try_from(duration.as_nanos()).unwrap_or(i64::MAX),
    }
}

pub(in crate::engine::storage_engine) fn current_unix_timestamp_units(
    precision: TimestampPrecision,
) -> i64 {
    let now = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default();
    duration_to_timestamp_units(now, precision)
}

pub(in crate::engine::storage_engine) fn partition_id_for_timestamp(
    timestamp: i64,
    partition_window: i64,
) -> i64 {
    timestamp.div_euclid(partition_window.max(1))
}

pub(in crate::engine::storage_engine) fn value_heap_bytes(value: &Value) -> usize {
    match value {
        Value::Bytes(bytes) => bytes.capacity(),
        Value::String(text) => text.capacity(),
        Value::Histogram(histogram) => {
            std::mem::size_of::<crate::NativeHistogram>()
                + histogram.negative_spans.capacity()
                    * std::mem::size_of::<crate::HistogramBucketSpan>()
                + histogram.negative_deltas.capacity() * std::mem::size_of::<i64>()
                + histogram.negative_counts.capacity() * std::mem::size_of::<f64>()
                + histogram.positive_spans.capacity()
                    * std::mem::size_of::<crate::HistogramBucketSpan>()
                + histogram.positive_deltas.capacity() * std::mem::size_of::<i64>()
                + histogram.positive_counts.capacity() * std::mem::size_of::<f64>()
                + histogram.custom_values.capacity() * std::mem::size_of::<f64>()
        }
        _ => 0,
    }
}

pub(in crate::engine::storage_engine) fn lane_for_value(value: &Value) -> ValueLane {
    match value {
        Value::Bytes(_) | Value::String(_) | Value::Histogram(_) => ValueLane::Blob,
        _ => ValueLane::Numeric,
    }
}

pub(in crate::engine::storage_engine) fn persisted_chunk_payload<'a>(
    persisted_segment_maps: &'a HashMap<usize, Arc<PlatformMmap>>,
    chunk_ref: &PersistedChunkRef,
) -> Result<Cow<'a, [u8]>> {
    let Some(mapped_segment) = persisted_segment_maps.get(&chunk_ref.segment_slot) else {
        return Err(TsinkError::DataCorruption(format!(
            "missing mapped segment slot {}",
            chunk_ref.segment_slot
        )));
    };
    crate::engine::segment::chunk_payload_from_record(
        mapped_segment.as_slice(),
        chunk_ref.chunk_offset,
        chunk_ref.chunk_len,
    )
}

impl ChunkStorage {
    pub(in crate::engine::storage_engine) fn ensure_open(&self) -> Result<()> {
        if self
            .observability
            .health
            .fail_fast_triggered
            .load(Ordering::SeqCst)
        {
            return Err(TsinkError::StorageShuttingDown);
        }
        if self.coordination.lifecycle.load(Ordering::SeqCst) != STORAGE_OPEN {
            return Err(TsinkError::StorageClosed);
        }
        Ok(())
    }

    pub(in crate::engine::storage_engine) fn install_data_path_process_lock(
        &self,
        data_path_process_lock: DataPathProcessLock,
    ) {
        *self.coordination.data_path_process_lock.lock() = Some(data_path_process_lock);
    }

    pub(in crate::engine::storage_engine) fn release_data_path_process_lock(&self) {
        self.coordination.data_path_process_lock.lock().take();
    }

    pub(in crate::engine::storage_engine) fn background_maintenance_gate(
        &self,
    ) -> MutexGuard<'_, ()> {
        self.coordination.background_maintenance_lock.lock()
    }

    pub(in crate::engine::storage_engine) fn compaction_gate(&self) -> MutexGuard<'_, ()> {
        self.coordination.compaction_lock.lock()
    }

    pub(in crate::engine::storage_engine) fn lock_compaction_gate(
        compaction_lock: &Mutex<()>,
    ) -> MutexGuard<'_, ()> {
        compaction_lock.lock()
    }

    pub(in crate::engine::storage_engine) fn current_timestamp_units(&self) -> i64 {
        #[cfg(test)]
        {
            let override_timestamp = self.current_time_override.load(Ordering::Acquire);
            if override_timestamp != i64::MIN {
                return override_timestamp;
            }
        }

        current_unix_timestamp_units(self.runtime.timestamp_precision)
    }

    pub(in crate::engine::storage_engine) fn current_future_skew_cutoff(&self) -> i64 {
        self.current_timestamp_units()
            .saturating_add(self.runtime.future_skew_window)
    }

    #[cfg(test)]
    pub(in crate::engine::storage_engine) fn set_current_time_override(&self, timestamp: i64) {
        self.current_time_override
            .store(timestamp, Ordering::Release);
        self.bump_live_series_pruning_generation();
    }
}