tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use super::super::*;
use super::base_storage_test_options;

pub(in crate::engine::storage_engine::tests) fn persistent_numeric_storage(
    root: &Path,
    timestamp_precision: TimestampPrecision,
    chunk_point_cap: usize,
) -> ChunkStorage {
    persistent_numeric_storage_with_chunk_cap(root, timestamp_precision, chunk_point_cap)
}

pub(in crate::engine::storage_engine::tests) fn persistent_numeric_storage_with_chunk_cap(
    root: &Path,
    timestamp_precision: TimestampPrecision,
    chunk_point_cap: usize,
) -> ChunkStorage {
    ChunkStorage::new_with_data_path_and_options(
        chunk_point_cap,
        None,
        Some(root.join(NUMERIC_LANE_ROOT)),
        None,
        1,
        base_storage_test_options(timestamp_precision, None),
    )
    .unwrap()
}

pub(in crate::engine::storage_engine::tests) fn reopen_persistent_numeric_storage(
    root: &Path,
    timestamp_precision: TimestampPrecision,
    chunk_point_cap: usize,
) -> ChunkStorage {
    reopen_persistent_numeric_storage_with_chunk_cap(root, timestamp_precision, chunk_point_cap)
}

pub(in crate::engine::storage_engine::tests) fn reopen_persistent_numeric_storage_with_chunk_cap(
    root: &Path,
    timestamp_precision: TimestampPrecision,
    chunk_point_cap: usize,
) -> ChunkStorage {
    let storage =
        persistent_numeric_storage_with_chunk_cap(root, timestamp_precision, chunk_point_cap);
    storage.load_tombstones_index().unwrap();
    storage
        .apply_loaded_segment_indexes(
            load_segment_indexes(root.join(NUMERIC_LANE_ROOT)).unwrap(),
            false,
        )
        .unwrap();
    storage.reconcile_live_metadata_indexes().unwrap();
    storage
}

pub(in crate::engine::storage_engine::tests) fn persistent_numeric_storage_with_metadata_shards(
    root: &Path,
    timestamp_precision: TimestampPrecision,
    chunk_point_cap: usize,
    metadata_shard_count: u32,
) -> ChunkStorage {
    let mut options = base_storage_test_options(timestamp_precision, None);
    options.metadata_shard_count = Some(metadata_shard_count);
    ChunkStorage::new_with_data_path_and_options(
        chunk_point_cap,
        None,
        Some(root.join(NUMERIC_LANE_ROOT)),
        None,
        1,
        options,
    )
    .unwrap()
}

pub(in crate::engine::storage_engine::tests) fn live_numeric_storage_with_config(
    root: &Path,
    timestamp_precision: TimestampPrecision,
    chunk_point_cap: usize,
    current_time_override: i64,
    retention: Duration,
    metadata_shard_count: Option<u32>,
) -> ChunkStorage {
    let mut options = base_storage_test_options(timestamp_precision, Some(current_time_override));
    options.retention_window =
        super::super::super::duration_to_timestamp_units(retention, timestamp_precision);
    options.metadata_shard_count = metadata_shard_count;
    ChunkStorage::new_with_data_path_and_options(
        chunk_point_cap,
        None,
        Some(root.join(NUMERIC_LANE_ROOT)),
        None,
        1,
        options,
    )
    .unwrap()
}

pub(in crate::engine::storage_engine::tests) fn persistent_rollup_storage(
    root: &Path,
) -> Arc<ChunkStorage> {
    let mut options = base_storage_test_options(TimestampPrecision::Milliseconds, None);
    options.retention_enforced = false;

    Arc::new(
        ChunkStorage::new_with_data_path_and_options(
            8,
            None,
            Some(root.join(NUMERIC_LANE_ROOT)),
            Some(root.join(BLOB_LANE_ROOT)),
            1,
            options,
        )
        .unwrap(),
    )
}

pub(in crate::engine::storage_engine::tests) fn reopen_persistent_rollup_storage(
    root: &Path,
) -> Arc<ChunkStorage> {
    let storage = persistent_rollup_storage(root);
    storage.load_tombstones_index().unwrap();

    let snapshot_path = root.join(SERIES_INDEX_FILE_NAME);
    let loaded_registry = SeriesRegistry::load_persisted_state(&snapshot_path).unwrap();
    let reconcile_registry_with_persisted = loaded_registry.is_some();
    if let Some(loaded_registry) = loaded_registry {
        storage
            .replace_registry_from_persisted_state(
                loaded_registry.registry,
                loaded_registry.delta_series_count,
            )
            .unwrap();
    }

    let loaded_segments = merge_loaded_segment_indexes(
        load_segment_indexes(root.join(NUMERIC_LANE_ROOT)).unwrap(),
        load_segment_indexes(root.join(BLOB_LANE_ROOT)).unwrap(),
        true,
        true,
    )
    .unwrap();
    storage
        .apply_loaded_segment_indexes(loaded_segments, reconcile_registry_with_persisted)
        .unwrap();
    storage.load_rollup_runtime_state().unwrap();
    storage.reconcile_live_metadata_indexes().unwrap();
    storage.refresh_segment_catalog_and_observability().unwrap();
    storage
}

pub(in crate::engine::storage_engine::tests) fn open_raw_numeric_storage_from_data_path(
    data_path: &Path,
) -> ChunkStorage {
    open_raw_numeric_storage_from_data_path_with_background_fail_fast(data_path, false)
}

pub(in crate::engine::storage_engine::tests) fn open_raw_numeric_storage_from_data_path_with_background_fail_fast(
    data_path: &Path,
    background_fail_fast: bool,
) -> ChunkStorage {
    let lane_path = data_path.join(NUMERIC_LANE_ROOT);
    let loaded_segments = load_segment_indexes(&lane_path).unwrap();
    let storage = ChunkStorage::new_with_data_path_and_options(
        2,
        None,
        Some(lane_path),
        None,
        loaded_segments.next_segment_id,
        ChunkStorageOptions {
            retention_enforced: false,
            background_threads_enabled: false,
            background_fail_fast,
            ..ChunkStorageOptions::default()
        },
    )
    .unwrap();
    storage
        .apply_loaded_segment_indexes(loaded_segments, false)
        .unwrap();
    storage
}

pub(in crate::engine::storage_engine::tests) fn open_raw_numeric_storage_with_registry_snapshot_from_data_path(
    data_path: &Path,
) -> ChunkStorage {
    let lane_path = data_path.join(NUMERIC_LANE_ROOT);
    let snapshot_path = data_path.join(SERIES_INDEX_FILE_NAME);
    let loaded_registry = SeriesRegistry::load_persisted_state(&snapshot_path).unwrap();
    let reconcile_registry_with_persisted = loaded_registry.is_some();
    let loaded_segments = load_segment_indexes(&lane_path).unwrap();
    let storage = ChunkStorage::new_with_data_path_and_options(
        2,
        None,
        Some(lane_path),
        None,
        loaded_segments.next_segment_id,
        ChunkStorageOptions {
            retention_enforced: false,
            background_threads_enabled: false,
            ..ChunkStorageOptions::default()
        },
    )
    .unwrap();
    if let Some(loaded_registry) = loaded_registry {
        storage
            .replace_registry_from_persisted_state(
                loaded_registry.registry,
                loaded_registry.delta_series_count,
            )
            .unwrap();
    }
    storage
        .apply_loaded_segment_indexes(loaded_segments, reconcile_registry_with_persisted)
        .unwrap();
    storage
}