tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use tempfile::TempDir;

use super::analysis::{analyze_series_read_sources, SeriesReadAnalysis};
use super::merge::{pop_next_point_from_sources, QueryMergeCursor, QueryMergeSourceRef};
use super::pagination::{RawSeriesPagination, SortedSeriesDedupeMode, SortedSeriesPageCollector};
use super::*;
use crate::engine::chunk::ChunkHeader;

fn default_future_skew_window(precision: TimestampPrecision) -> i64 {
    super::super::duration_to_timestamp_units(
        super::super::DEFAULT_FUTURE_SKEW_ALLOWANCE,
        precision,
    )
}

fn test_options(current_time_override: Option<i64>) -> ChunkStorageOptions {
    ChunkStorageOptions {
        timestamp_precision: TimestampPrecision::Nanoseconds,
        retention_window: i64::MAX,
        future_skew_window: default_future_skew_window(TimestampPrecision::Nanoseconds),
        retention_enforced: false,
        runtime_mode: StorageRuntimeMode::ReadWrite,
        partition_window: i64::MAX,
        max_active_partition_heads_per_series:
            crate::storage::DEFAULT_MAX_ACTIVE_PARTITION_HEADS_PER_SERIES,
        max_writers: 2,
        write_timeout: Duration::from_secs(2),
        memory_budget_bytes: u64::MAX,
        cardinality_limit: usize::MAX,
        wal_size_limit_bytes: u64::MAX,
        admission_poll_interval: DEFAULT_ADMISSION_POLL_INTERVAL,
        compaction_interval: DEFAULT_COMPACTION_INTERVAL,
        background_threads_enabled: false,
        background_fail_fast: false,
        metadata_shard_count: None,
        remote_segment_cache_policy: RemoteSegmentCachePolicy::MetadataOnly,
        remote_segment_refresh_interval: Duration::from_secs(5),
        tiered_storage: None,
        #[cfg(test)]
        current_time_override,
    }
}

fn live_storage(chunk_point_cap: usize) -> ChunkStorage {
    ChunkStorage::new_with_data_path_and_options(
        chunk_point_cap,
        None,
        None,
        None,
        1,
        test_options(None),
    )
    .unwrap()
}

fn persistent_numeric_storage(path: &std::path::Path, chunk_point_cap: usize) -> ChunkStorage {
    ChunkStorage::new_with_data_path_and_options(
        chunk_point_cap,
        None,
        Some(path.join(NUMERIC_LANE_ROOT)),
        None,
        1,
        test_options(None),
    )
    .unwrap()
}

fn unsorted_chunk(series_id: SeriesId, points: &[(i64, f64)]) -> Arc<Chunk> {
    let chunk_points = points
        .iter()
        .map(|(ts, value)| ChunkPoint {
            ts: *ts,
            value: Value::F64(*value),
        })
        .collect::<Vec<_>>();
    let min_ts = points.iter().map(|(ts, _)| *ts).min().unwrap_or(i64::MIN);
    let max_ts = points.iter().map(|(ts, _)| *ts).max().unwrap_or(i64::MAX);
    Arc::new(Chunk {
        header: ChunkHeader {
            series_id,
            lane: ValueLane::Numeric,
            value_family: Some(SeriesValueFamily::F64),
            point_count: u16::try_from(points.len()).unwrap_or(u16::MAX),
            min_ts,
            max_ts,
            ts_codec: crate::engine::chunk::TimestampCodecId::DeltaOfDeltaBitpack,
            value_codec: crate::engine::chunk::ValueCodecId::GorillaXorF64,
        },
        points: chunk_points,
        encoded_payload: Vec::new(),
        wal_highwater: WalHighWatermark::default(),
    })
}

struct FakeMergeCursor {
    points: Vec<DataPoint>,
    next_idx: usize,
}

impl FakeMergeCursor {
    fn new(points: Vec<DataPoint>) -> Self {
        Self {
            points,
            next_idx: 0,
        }
    }
}

impl QueryMergeCursor for FakeMergeCursor {
    fn peek_timestamp(&mut self) -> Result<Option<i64>> {
        Ok(self.points.get(self.next_idx).map(|point| point.timestamp))
    }

    fn pop_point(&mut self) -> Result<Option<DataPoint>> {
        let point = self.points.get(self.next_idx).cloned();
        if point.is_some() {
            self.next_idx = self.next_idx.saturating_add(1);
        }
        Ok(point)
    }
}

#[test]
fn analyze_series_read_sources_routes_unsorted_sealed_data_to_append_sort() {
    let analysis = analyze_series_read_sources(
        &[],
        &[unsorted_chunk(7, &[(2, 2.0), (1, 1.0)])],
        &ActiveSeriesSnapshot::default(),
        0,
        10,
    );

    assert_eq!(
        analysis,
        SeriesReadAnalysis {
            estimated_points: 2,
            has_overlap: false,
            requires_output_validation: true,
            requires_timestamp_dedupe: false,
            requires_exact_dedupe: false,
            persisted_source_sorted: true,
            sealed_source_sorted: true,
        }
    );
    assert!(!analysis.can_use_merge_path());
}

#[test]
fn pop_next_point_from_sources_preserves_source_precedence_on_timestamp_ties() {
    let mut persisted =
        FakeMergeCursor::new(vec![DataPoint::new(1, 10.0), DataPoint::new(5, 50.0)]);
    let mut sealed = FakeMergeCursor::new(vec![DataPoint::new(1, 20.0), DataPoint::new(4, 40.0)]);
    let mut active = FakeMergeCursor::new(vec![DataPoint::new(1, 30.0), DataPoint::new(3, 30.5)]);

    let mut sources = [
        QueryMergeSourceRef::new(&mut persisted),
        QueryMergeSourceRef::new(&mut sealed),
        QueryMergeSourceRef::new(&mut active),
    ];
    let mut merged = Vec::new();
    while let Some(point) = pop_next_point_from_sources(&mut sources).unwrap() {
        merged.push(point);
    }

    assert_eq!(
        merged,
        vec![
            DataPoint::new(1, 10.0),
            DataPoint::new(1, 20.0),
            DataPoint::new(1, 30.0),
            DataPoint::new(3, 30.5),
            DataPoint::new(4, 40.0),
            DataPoint::new(5, 50.0),
        ]
    );
}

#[test]
fn sorted_series_page_collector_applies_pagination_after_filters_and_dedupe() {
    let tombstones = [tombstone::TombstoneRange { start: 12, end: 13 }];
    let mut collector = SortedSeriesPageCollector::new(
        Some(11),
        Some(&tombstones),
        SortedSeriesDedupeMode::Timestamp,
        RawSeriesPagination::new(1, Some(1)),
    );

    assert!(!collector.push(DataPoint::new(10, 10.0)));
    assert!(!collector.push(DataPoint::new(11, 11.0)));
    assert!(!collector.push(DataPoint::new(11, 11.5)));
    assert!(!collector.push(DataPoint::new(12, 12.0)));
    assert!(!collector.push(DataPoint::new(13, 13.0)));
    collector.finish();

    assert_eq!(collector.final_rows_seen(), 2);
    assert_eq!(collector.into_points(), vec![DataPoint::new(13, 13.0)]);
}

#[test]
fn snapshot_in_memory_series_sources_release_shard_locks_without_select_wrapper() {
    let storage = Arc::new(live_storage(512));

    let query_metric = "snapshot_query_metric";
    let query_labels = vec![Label::new("host", "query")];
    let rows = (0..192i64)
        .map(|ts| {
            Row::with_labels(
                query_metric,
                query_labels.clone(),
                DataPoint::new(ts, ts as f64),
            )
        })
        .collect::<Vec<_>>();
    storage.insert_rows(&rows).unwrap();

    let query_series_id = storage
        .catalog
        .registry
        .read()
        .resolve_existing(query_metric, &query_labels)
        .unwrap()
        .series_id;
    let target_shard = ChunkStorage::series_shard_idx(query_series_id);

    let writer_metric = "snapshot_writer_metric";
    let writer_labels = (0..1024usize)
        .find_map(|idx| {
            let labels = vec![Label::new("host", format!("writer-{idx}"))];
            storage
                .insert_rows(&[Row::with_labels(
                    writer_metric,
                    labels.clone(),
                    DataPoint::new(10_000, 10_000.0),
                )])
                .unwrap();
            let series_id = storage
                .catalog
                .registry
                .read()
                .resolve_existing(writer_metric, &labels)
                .unwrap()
                .series_id;
            (ChunkStorage::series_shard_idx(series_id) == target_shard).then_some(labels)
        })
        .expect("expected same-shard writer series");

    let snapshot_storage = Arc::clone(&storage);
    let (snapshot_ready_tx, snapshot_ready_rx) = mpsc::channel();
    let (snapshot_release_tx, snapshot_release_rx) = mpsc::channel();
    let snapshot_thread = thread::spawn(move || {
        let (snapshot, stats) =
            snapshot_storage.snapshot_in_memory_series_sources(query_series_id, 0, 256);
        snapshot_ready_tx
            .send((snapshot.active_point_count(), stats.snapshot_count()))
            .unwrap();
        snapshot_release_rx.recv().unwrap();
    });

    let (point_count, snapshots) = snapshot_ready_rx
        .recv_timeout(Duration::from_secs(2))
        .expect("snapshot did not complete");
    assert_eq!(point_count, 192);
    assert_eq!(snapshots, 1);

    let writer_storage = Arc::clone(&storage);
    let (writer_tx, writer_rx) = mpsc::channel();
    let writer_thread = thread::spawn(move || {
        let result = (|| -> Result<()> {
            for offset in 1..=32i64 {
                writer_storage.insert_rows(&[Row::with_labels(
                    writer_metric,
                    writer_labels.clone(),
                    DataPoint::new(10_000 + offset, (10_000 + offset) as f64),
                )])?;
            }
            Ok(())
        })();
        writer_tx.send(result).unwrap();
    });

    let writer_result = writer_rx
        .recv_timeout(Duration::from_millis(500))
        .expect("writer should progress after snapshot creation");
    assert!(writer_result.is_ok());

    snapshot_release_tx.send(()).unwrap();
    writer_thread.join().unwrap();
    snapshot_thread.join().unwrap();
}

#[test]
fn merge_and_append_sort_paths_match_for_persisted_and_active_exact_duplicates() {
    let temp_dir = TempDir::new().unwrap();
    let storage = persistent_numeric_storage(temp_dir.path(), 4);
    let labels = vec![Label::new("host", "equivalent")];

    storage
        .insert_rows(&[
            Row::with_labels("cpu", labels.clone(), DataPoint::new(1, 1.0)),
            Row::with_labels("cpu", labels.clone(), DataPoint::new(2, 2.0)),
        ])
        .unwrap();
    storage.flush_all_active().unwrap();
    assert!(storage.persist_segment_with_outcome().unwrap().persisted);

    storage
        .insert_rows(&[
            Row::with_labels("cpu", labels.clone(), DataPoint::new(2, 2.0)),
            Row::with_labels("cpu", labels.clone(), DataPoint::new(3, 3.0)),
        ])
        .unwrap();

    let series_id = storage
        .catalog
        .registry
        .read()
        .resolve_existing("cpu", &labels)
        .unwrap()
        .series_id;
    let plan = TieredQueryPlan::from_cutoffs(0, 10, None, None);

    let (merge_snapshot, _) = storage
        .snapshot_series_read_sources(series_id, 0, 10, plan)
        .unwrap();
    assert!(merge_snapshot.analysis.can_use_merge_path());
    let mut merge_points = Vec::new();
    storage
        .execute_series_read_merge_path(series_id, 0, 10, merge_snapshot, &mut merge_points)
        .unwrap();

    let (append_snapshot, _) = storage
        .snapshot_series_read_sources(series_id, 0, 10, plan)
        .unwrap();
    let mut append_sort_points = Vec::new();
    storage
        .execute_series_read_append_sort_path(
            series_id,
            0,
            10,
            append_snapshot,
            &mut append_sort_points,
        )
        .unwrap();

    assert_eq!(merge_points, append_sort_points);
    assert_eq!(
        merge_points,
        vec![
            DataPoint::new(1, 1.0),
            DataPoint::new(2, 2.0),
            DataPoint::new(3, 3.0),
        ]
    );

    storage.close().unwrap();
}