tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::pagination::{RawSeriesPagination, RawSeriesScanPage};
use super::snapshot::SeriesReadSnapshot;
use super::*;

impl ChunkStorage {
    pub(super) fn execute_series_read_append_sort_path(
        &self,
        series_id: SeriesId,
        start: i64,
        end: i64,
        snapshot: SeriesReadSnapshot,
        out: &mut Vec<DataPoint>,
    ) -> Result<PersistedTierFetchStats> {
        let SeriesReadSnapshot {
            persisted,
            sealed_chunks,
            active_points,
            analysis,
        } = snapshot;

        out.clear();
        let persisted_stats = decode_append_sort_sources_into(
            &persisted,
            &sealed_chunks,
            &active_points,
            start,
            end,
            out,
        )?;
        self.finalize_append_sort_points(series_id, analysis, out);
        Ok(persisted_stats)
    }

    pub(super) fn collect_raw_series_page_with_append_sort(
        &self,
        series_id: SeriesId,
        start: i64,
        end: i64,
        snapshot: SeriesReadSnapshot,
        pagination: RawSeriesPagination,
    ) -> Result<RawSeriesScanPage> {
        let mut points = Vec::new();
        let stats = self.execute_series_read_append_sort_path(
            series_id,
            start,
            end,
            snapshot,
            &mut points,
        )?;
        let total_rows = points.len();
        let rows_consumed = pagination.rows_consumed(total_rows);
        apply_offset_limit_in_place(&mut points, pagination.offset, pagination.limit);

        Ok(RawSeriesScanPage {
            points,
            final_rows_seen: saturating_u64_from_usize(rows_consumed),
            reached_end: rows_consumed >= total_rows,
            stats,
        })
    }

    fn finalize_append_sort_points(
        &self,
        series_id: SeriesId,
        analysis: super::analysis::SeriesReadAnalysis,
        out: &mut Vec<DataPoint>,
    ) {
        self.apply_retention_filter(out);

        if analysis.needs_append_sort_reorder() {
            if !points_are_sorted_by_timestamp(out) {
                out.sort_by_key(|point| point.timestamp);
            }

            match analysis.sorted_dedupe_mode() {
                super::pagination::SortedSeriesDedupeMode::Timestamp => {
                    dedupe_last_value_per_timestamp(out);
                }
                super::pagination::SortedSeriesDedupeMode::Exact => {
                    dedupe_exact_duplicate_points(out);
                }
                super::pagination::SortedSeriesDedupeMode::None => {}
            }
        }

        self.apply_tombstone_filter(series_id, out);
    }
}

fn decode_append_sort_sources_into(
    persisted: &super::snapshot::PersistedSeriesSourceSnapshot,
    sealed_chunks: &[Arc<Chunk>],
    active_points: &ActiveSeriesSnapshot,
    start: i64,
    end: i64,
    out: &mut Vec<DataPoint>,
) -> Result<PersistedTierFetchStats> {
    let mut persisted_stats = PersistedTierFetchStats::default();

    for chunk_ref in &persisted.chunks {
        let decode_started = Instant::now();
        let payload = persisted_chunk_payload(&persisted.segment_maps, chunk_ref)?;
        decode_encoded_chunk_payload_in_range_into(
            EncodedChunkDescriptor {
                lane: chunk_ref.lane,
                ts_codec: chunk_ref.ts_codec,
                value_codec: chunk_ref.value_codec,
                point_count: chunk_ref.point_count as usize,
            },
            payload.as_ref(),
            start,
            end,
            out,
        )?;
        persisted_stats.record_chunk(
            persisted.chunk_tier(chunk_ref),
            elapsed_nanos_u64(decode_started),
        );
    }

    for chunk in sealed_chunks {
        decode_chunk_points_in_range_into(chunk, start, end, out)?;
    }

    for point in active_points.iter_points_in_partition_order() {
        if point.ts >= start && point.ts < end {
            out.push(DataPoint::new(point.ts, point.value.clone()));
        }
    }

    Ok(persisted_stats)
}