tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::super::chunk::ChunkBuilderSnapshotCursor;
use super::super::*;

#[cfg(test)]
std::thread_local! {
    static ACTIVE_SERIES_SNAPSHOT_FLATTEN_COUNT: std::cell::Cell<usize> =
        const { std::cell::Cell::new(0) };
    static ACTIVE_SERIES_SNAPSHOT_POINT_COUNT: std::cell::Cell<usize> =
        const { std::cell::Cell::new(0) };
}

#[derive(Debug, Clone, Default)]
pub(in crate::engine::storage_engine) struct ActivePartitionSnapshot {
    points: chunk::ChunkBuilderSnapshot,
}

impl ActivePartitionSnapshot {
    pub(in crate::engine::storage_engine) fn from_points(
        points: chunk::ChunkBuilderSnapshot,
    ) -> Self {
        Self { points }
    }

    pub(in crate::engine::storage_engine) fn is_empty(&self) -> bool {
        self.points.is_empty()
    }

    #[cfg(test)]
    #[allow(dead_code)]
    pub(in crate::engine::storage_engine) fn point_count(&self) -> usize {
        self.points.len()
    }

    pub(in crate::engine::storage_engine) fn iter_points(
        &self,
    ) -> impl Iterator<Item = &ChunkPoint> + '_ {
        self.points.iter_points()
    }

    pub(in crate::engine::storage_engine) fn into_cursor(self) -> ChunkBuilderSnapshotCursor {
        self.points.into_cursor()
    }

    #[cfg(test)]
    #[allow(dead_code)]
    pub(in crate::engine::storage_engine) fn into_points(self) -> Vec<ChunkPoint> {
        self.points.into_points()
    }
}

#[derive(Debug, Clone, Default)]
pub(in crate::engine::storage_engine) struct ActiveSeriesSnapshot {
    partition_heads: Vec<ActivePartitionSnapshot>,
}

pub(in crate::engine::storage_engine) struct ActiveSeriesSnapshotCursor {
    partition_heads: std::vec::IntoIter<ActivePartitionSnapshot>,
    current_partition: Option<ChunkBuilderSnapshotCursor>,
}

impl ActiveSeriesSnapshot {
    pub(in crate::engine::storage_engine) fn from_partition_heads(
        partition_heads: Vec<ActivePartitionSnapshot>,
    ) -> Self {
        Self { partition_heads }
    }

    #[cfg(test)]
    #[allow(dead_code)]
    pub(in crate::engine::storage_engine) fn point_count(&self) -> usize {
        self.partition_heads
            .iter()
            .map(ActivePartitionSnapshot::point_count)
            .sum()
    }

    pub(in crate::engine::storage_engine) fn iter_points_in_partition_order(
        &self,
    ) -> impl Iterator<Item = &ChunkPoint> + '_ {
        self.partition_heads
            .iter()
            .flat_map(ActivePartitionSnapshot::iter_points)
    }

    pub(in crate::engine::storage_engine) fn into_cursor(self) -> ActiveSeriesSnapshotCursor {
        ActiveSeriesSnapshotCursor {
            partition_heads: self.partition_heads.into_iter(),
            current_partition: None,
        }
    }

    #[cfg(test)]
    #[allow(dead_code)]
    pub(in crate::engine::storage_engine) fn into_points_in_partition_order(
        self,
    ) -> Vec<ChunkPoint> {
        #[cfg(test)]
        ACTIVE_SERIES_SNAPSHOT_FLATTEN_COUNT.with(|count| {
            count.set(count.get().saturating_add(1));
        });

        let mut points = Vec::with_capacity(self.point_count());
        for partition in self.partition_heads {
            points.extend(partition.into_points());
        }
        points
    }
}

impl ActiveSeriesSnapshotCursor {
    fn ensure_head(&mut self) -> Option<&ChunkPoint> {
        loop {
            if self.current_partition.is_none() {
                self.current_partition = Some(self.partition_heads.next()?.into_cursor());
            }

            if self
                .current_partition
                .as_ref()
                .and_then(ChunkBuilderSnapshotCursor::peek)
                .is_some()
            {
                return self
                    .current_partition
                    .as_ref()
                    .and_then(ChunkBuilderSnapshotCursor::peek);
            }

            self.current_partition = None;
        }
    }

    pub(in crate::engine::storage_engine) fn peek(&mut self) -> Option<&ChunkPoint> {
        self.ensure_head()
    }

    pub(in crate::engine::storage_engine) fn advance(&mut self) -> bool {
        if self.ensure_head().is_none() {
            return false;
        }

        self.current_partition
            .as_mut()
            .expect("active snapshot cursor must have a partition after ensure_head")
            .advance()
    }
}

#[cfg(test)]
pub(in crate::engine) fn reset_active_series_snapshot_flatten_count() {
    ACTIVE_SERIES_SNAPSHOT_FLATTEN_COUNT.with(|count| count.set(0));
}

#[cfg(test)]
pub(in crate::engine) fn active_series_snapshot_flatten_count() -> usize {
    ACTIVE_SERIES_SNAPSHOT_FLATTEN_COUNT.with(std::cell::Cell::get)
}

#[cfg(test)]
pub(in crate::engine) fn reset_active_series_snapshot_point_count() {
    ACTIVE_SERIES_SNAPSHOT_POINT_COUNT.with(|count| count.set(0));
}

#[cfg(test)]
pub(in crate::engine) fn active_series_snapshot_point_count() -> usize {
    ACTIVE_SERIES_SNAPSHOT_POINT_COUNT.with(std::cell::Cell::get)
}

#[cfg(test)]
pub(in crate::engine::storage_engine) fn record_active_series_snapshot_point_count(
    point_count: usize,
) {
    ACTIVE_SERIES_SNAPSHOT_POINT_COUNT.with(|count| {
        count.set(count.get().saturating_add(point_count));
    });
}