tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::path::{Component, Path, PathBuf};

use serde::{Deserialize, Serialize};

use super::super::config::TieredStorageConfig;
use super::super::{Result, TsinkError};
use super::inventory::SegmentInventoryAccumulator;
use super::layout::{relative_segment_path, SegmentPathResolver};
use super::{PersistedSegmentTier, SegmentInventory, SegmentInventoryEntry, SegmentLaneFamily};
use crate::engine::fs_utils::write_file_atomically_and_sync_parent;
use crate::engine::segment::{SegmentManifest, WalHighWatermark};

pub(in crate::engine::storage_engine) const SEGMENT_CATALOG_FILE_NAME: &str =
    "segment_catalog.json";
const SEGMENT_CATALOG_VERSION: u32 = 2;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct SegmentCatalogFile {
    version: u32,
    entries: Vec<SegmentCatalogEntry>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct SegmentCatalogEntry {
    lane: SegmentLaneFamily,
    tier: PersistedSegmentTier,
    level: u8,
    segment_id: u64,
    #[serde(default)]
    chunk_count: usize,
    #[serde(default)]
    point_count: usize,
    #[serde(default)]
    series_count: usize,
    min_ts: Option<i64>,
    max_ts: Option<i64>,
    #[serde(default)]
    wal_highwater_segment: u64,
    #[serde(default)]
    wal_highwater_frame: u64,
    relative_path: String,
}

pub(in crate::engine::storage_engine) fn persist_segment_catalog(
    path: &Path,
    inventory: &SegmentInventory,
) -> Result<()> {
    let entries = inventory
        .entries()
        .iter()
        .map(|entry| SegmentCatalogEntry {
            lane: entry.lane,
            tier: entry.tier,
            level: entry.manifest.level,
            segment_id: entry.manifest.segment_id,
            chunk_count: entry.manifest.chunk_count,
            point_count: entry.manifest.point_count,
            series_count: entry.manifest.series_count,
            min_ts: entry.manifest.min_ts,
            max_ts: entry.manifest.max_ts,
            wal_highwater_segment: entry.manifest.wal_highwater.segment,
            wal_highwater_frame: entry.manifest.wal_highwater.frame,
            relative_path: relative_segment_path(&entry.manifest)
                .to_string_lossy()
                .into_owned(),
        })
        .collect::<Vec<_>>();

    let bytes = serde_json::to_vec_pretty(&SegmentCatalogFile {
        version: SEGMENT_CATALOG_VERSION,
        entries,
    })?;
    write_file_atomically_and_sync_parent(path, &bytes)
}

pub(in crate::engine::storage_engine) fn shared_segment_catalog_path(
    config: &TieredStorageConfig,
) -> PathBuf {
    config.object_store_root.join(SEGMENT_CATALOG_FILE_NAME)
}

pub(in crate::engine::storage_engine) fn load_segment_catalog(
    path: &Path,
    numeric_lane_path: Option<&Path>,
    blob_lane_path: Option<&Path>,
    tiered_storage: Option<&TieredStorageConfig>,
) -> Result<SegmentInventory> {
    let bytes = std::fs::read(path)?;
    let file: SegmentCatalogFile = serde_json::from_slice(&bytes)?;
    if !(1..=SEGMENT_CATALOG_VERSION).contains(&file.version) {
        return Err(TsinkError::InvalidConfiguration(format!(
            "unsupported segment catalog version {} at {}",
            file.version,
            path.display()
        )));
    }

    let resolver = SegmentPathResolver::new(numeric_lane_path, blob_lane_path, tiered_storage);
    let mut deduped = SegmentInventoryAccumulator::default();
    for entry in file.entries {
        deduped.insert(catalog_entry_to_inventory_entry(entry, resolver)?);
    }

    Ok(deduped.finish())
}

fn catalog_entry_to_inventory_entry(
    entry: SegmentCatalogEntry,
    resolver: SegmentPathResolver<'_>,
) -> Result<SegmentInventoryEntry> {
    let manifest = SegmentManifest {
        segment_id: entry.segment_id,
        level: entry.level,
        chunk_count: entry.chunk_count,
        point_count: entry.point_count,
        series_count: entry.series_count,
        min_ts: entry.min_ts,
        max_ts: entry.max_ts,
        wal_highwater: WalHighWatermark {
            segment: entry.wal_highwater_segment,
            frame: entry.wal_highwater_frame,
        },
    };
    let relative_path = validated_catalog_relative_path(&entry.relative_path, &manifest)?;
    let base_path = resolver.catalog_lane_root(entry.lane, entry.tier)?;
    Ok(SegmentInventoryEntry {
        lane: entry.lane,
        tier: entry.tier,
        root: base_path.join(relative_path),
        manifest,
    })
}

fn validated_catalog_relative_path(
    relative_path: &str,
    manifest: &SegmentManifest,
) -> Result<PathBuf> {
    let path = PathBuf::from(relative_path);
    if path.is_absolute()
        || path.components().any(|component| {
            matches!(
                component,
                Component::Prefix(_) | Component::RootDir | Component::ParentDir
            )
        })
    {
        return Err(TsinkError::InvalidConfiguration(format!(
            "segment catalog entry path must stay within the configured lane root: {relative_path}"
        )));
    }

    let expected = relative_segment_path(manifest);
    if path != expected {
        return Err(TsinkError::InvalidConfiguration(format!(
            "segment catalog entry path {relative_path} did not match expected relative path {}",
            expected.display()
        )));
    }

    Ok(path)
}