tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::collections::hash_map::Entry;

use super::*;
use crate::engine::binio::{read_bytes, read_u32};
use crate::engine::chunk::{ValueCodecId, ValueLane};
use crate::{Result, TsinkError, Value};

impl SeriesValueFamily {
    pub fn from_value(value: &Value, lane: ValueLane) -> Result<Self> {
        match (value, lane) {
            (Value::F64(_), ValueLane::Numeric) => Ok(Self::F64),
            (Value::I64(_), ValueLane::Numeric) => Ok(Self::I64),
            (Value::U64(_), ValueLane::Numeric) => Ok(Self::U64),
            (Value::Bool(_), ValueLane::Numeric) => Ok(Self::Bool),
            (Value::Bytes(_) | Value::String(_), ValueLane::Blob) => Ok(Self::Blob),
            (Value::Histogram(_), ValueLane::Blob) => Ok(Self::Histogram),
            (_, ValueLane::Numeric) => Err(TsinkError::ValueTypeMismatch {
                expected: "numeric lane value".to_string(),
                actual: value.kind().to_string(),
            }),
            (_, ValueLane::Blob) => Err(TsinkError::ValueTypeMismatch {
                expected: "blob lane value".to_string(),
                actual: value.kind().to_string(),
            }),
        }
    }

    pub fn from_encoded(
        lane: ValueLane,
        value_codec: ValueCodecId,
        payload: &[u8],
    ) -> Result<Self> {
        match value_codec {
            ValueCodecId::GorillaXorF64 => {
                if lane != ValueLane::Numeric {
                    return Err(TsinkError::DataCorruption(
                        "f64 codec stored in blob lane".to_string(),
                    ));
                }
                Ok(Self::F64)
            }
            ValueCodecId::ZigZagDeltaBitpackI64 => {
                if lane != ValueLane::Numeric {
                    return Err(TsinkError::DataCorruption(
                        "i64 codec stored in blob lane".to_string(),
                    ));
                }
                Ok(Self::I64)
            }
            ValueCodecId::DeltaBitpackU64 => {
                if lane != ValueLane::Numeric {
                    return Err(TsinkError::DataCorruption(
                        "u64 codec stored in blob lane".to_string(),
                    ));
                }
                Ok(Self::U64)
            }
            ValueCodecId::BoolBitpack => {
                if lane != ValueLane::Numeric {
                    return Err(TsinkError::DataCorruption(
                        "bool codec stored in blob lane".to_string(),
                    ));
                }
                Ok(Self::Bool)
            }
            ValueCodecId::BytesDeltaBlock => {
                let value_payload = Self::value_payload_from_encoded_chunk(payload)?;
                let Some(&tag) = value_payload.first() else {
                    return Err(TsinkError::DataCorruption(
                        "blob delta-block payload is empty".to_string(),
                    ));
                };
                Self::from_encoded_tag(tag, lane, "blob delta-block")
            }
            ValueCodecId::ConstantRle => {
                let value_payload = Self::value_payload_from_encoded_chunk(payload)?;
                let Some(&tag) = value_payload.first() else {
                    return Err(TsinkError::DataCorruption(
                        "constant-rle payload is empty".to_string(),
                    ));
                };
                Self::from_encoded_tag(tag, lane, "constant-rle")
            }
        }
    }

    pub fn name(self) -> &'static str {
        match self {
            Self::F64 => "f64",
            Self::I64 => "i64",
            Self::U64 => "u64",
            Self::Bool => "bool",
            Self::Blob => "bytes/string",
            Self::Histogram => "histogram",
        }
    }

    fn from_encoded_tag(tag: u8, lane: ValueLane, context: &str) -> Result<Self> {
        match lane {
            ValueLane::Numeric => match tag {
                1 => Ok(Self::F64),
                2 => Ok(Self::I64),
                3 => Ok(Self::U64),
                4 => Ok(Self::Bool),
                5..=7 => Err(TsinkError::DataCorruption(format!(
                    "{context} encoded blob tag {tag} in numeric lane"
                ))),
                _ => Err(TsinkError::DataCorruption(format!(
                    "unknown {context} value tag {tag}"
                ))),
            },
            ValueLane::Blob => match tag {
                5 | 6 => Ok(Self::Blob),
                7 => Ok(Self::Histogram),
                1..=4 => Err(TsinkError::DataCorruption(format!(
                    "{context} encoded numeric tag {tag} in blob lane"
                ))),
                _ => Err(TsinkError::DataCorruption(format!(
                    "unknown {context} value tag {tag}"
                ))),
            },
        }
    }

    fn value_payload_from_encoded_chunk(payload: &[u8]) -> Result<&[u8]> {
        let mut pos = 0usize;
        let ts_len = read_u32(payload, &mut pos)? as usize;
        let _ts_payload = read_bytes(payload, &mut pos, ts_len)?;
        let value_len = read_u32(payload, &mut pos)? as usize;
        let value_payload = read_bytes(payload, &mut pos, value_len)?;
        if pos != payload.len() {
            return Err(TsinkError::DataCorruption(
                "encoded chunk payload has trailing bytes".to_string(),
            ));
        }
        Ok(value_payload)
    }
}

impl SeriesRegistry {
    pub fn series_value_family(&self, series_id: SeriesId) -> Option<SeriesValueFamily> {
        let shard_idx = self.load_series_registry_shard_idx(series_id)?;
        self.series_shards[shard_idx]
            .read()
            .value_families
            .get(&series_id)
            .copied()
    }

    pub fn assign_series_value_family_if_missing(
        &self,
        series_id: SeriesId,
        family: SeriesValueFamily,
    ) -> Result<bool> {
        let Some(shard_idx) = self.load_series_registry_shard_idx(series_id) else {
            return Err(TsinkError::DataCorruption(format!(
                "cannot assign value family to unknown series id {}",
                series_id
            )));
        };
        let mut shard = self.series_shards[shard_idx].write();
        if !shard.by_id.contains_key(&series_id) {
            return Err(TsinkError::DataCorruption(format!(
                "cannot assign value family to unknown series id {}",
                series_id
            )));
        }

        match shard.value_families.entry(series_id) {
            Entry::Occupied(existing) => {
                if *existing.get() != family {
                    return Err(TsinkError::ValueTypeMismatch {
                        expected: existing.get().name().to_string(),
                        actual: family.name().to_string(),
                    });
                }
                Ok(false)
            }
            Entry::Vacant(vacant) => {
                vacant.insert(family);
                self.add_estimated_memory_bytes(Self::value_family_entry_bytes());
                shard.estimated_series_bytes = shard
                    .estimated_series_bytes
                    .saturating_add(Self::value_family_entry_bytes());
                Ok(true)
            }
        }
    }

    pub fn record_series_value_family(
        &self,
        series_id: SeriesId,
        family: SeriesValueFamily,
    ) -> Result<()> {
        let Some(shard_idx) = self.load_series_registry_shard_idx(series_id) else {
            return Err(TsinkError::DataCorruption(format!(
                "cannot record value family for unknown series id {}",
                series_id
            )));
        };
        let mut shard = self.series_shards[shard_idx].write();
        if !shard.by_id.contains_key(&series_id) {
            return Err(TsinkError::DataCorruption(format!(
                "cannot record value family for unknown series id {}",
                series_id
            )));
        }

        match shard.value_families.entry(series_id) {
            Entry::Occupied(existing) => {
                if *existing.get() != family {
                    return Err(TsinkError::DataCorruption(format!(
                        "series id {} mixes value families across history: expected {}, got {}",
                        series_id,
                        existing.get().name(),
                        family.name(),
                    )));
                }
                Ok(())
            }
            Entry::Vacant(vacant) => {
                vacant.insert(family);
                self.add_estimated_memory_bytes(Self::value_family_entry_bytes());
                shard.estimated_series_bytes = shard
                    .estimated_series_bytes
                    .saturating_add(Self::value_family_entry_bytes());
                Ok(())
            }
        }
    }

    pub fn clear_series_value_family(&self, series_id: SeriesId) {
        let Some(shard_idx) = self.load_series_registry_shard_idx(series_id) else {
            return;
        };
        let mut shard = self.series_shards[shard_idx].write();
        if shard.value_families.remove(&series_id).is_some() {
            self.sub_estimated_memory_bytes(Self::value_family_entry_bytes());
            shard.estimated_series_bytes = shard
                .estimated_series_bytes
                .saturating_sub(Self::value_family_entry_bytes());
        }
    }
}

pub(super) fn encode_optional_series_value_family(family: Option<SeriesValueFamily>) -> u16 {
    match family {
        None => 0,
        Some(SeriesValueFamily::F64) => 1,
        Some(SeriesValueFamily::I64) => 2,
        Some(SeriesValueFamily::U64) => 3,
        Some(SeriesValueFamily::Bool) => 4,
        Some(SeriesValueFamily::Blob) => 5,
        Some(SeriesValueFamily::Histogram) => 6,
    }
}

pub(super) fn decode_optional_series_value_family(raw: u16) -> Result<Option<SeriesValueFamily>> {
    match raw {
        0 => Ok(None),
        1 => Ok(Some(SeriesValueFamily::F64)),
        2 => Ok(Some(SeriesValueFamily::I64)),
        3 => Ok(Some(SeriesValueFamily::U64)),
        4 => Ok(Some(SeriesValueFamily::Bool)),
        5 => Ok(Some(SeriesValueFamily::Blob)),
        6 => Ok(Some(SeriesValueFamily::Histogram)),
        _ => Err(TsinkError::DataCorruption(format!(
            "unknown registry value family tag {raw}"
        ))),
    }
}