tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::collections::{BTreeMap, BTreeSet};
use std::fs::{self, File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(test)]
use std::sync::Arc;
use std::time::Instant;

use parking_lot::{Condvar, Mutex, MutexGuard};
use tracing::warn;

use crate::engine::binio::{
    append_i64, append_u16, append_u32, append_u64, append_u8, checksum32, read_bytes, read_i64,
    read_u16, read_u32, read_u32_at, read_u64, read_u64_at, read_u8, write_u32_at, write_u64_at,
};
use crate::engine::chunk::{ChunkPoint, TimestampCodecId, ValueCodecId, ValueLane};
use crate::engine::encoder::{EncodedChunk, Encoder};
use crate::engine::segment::WalHighWatermark;
use crate::engine::series::SeriesId;
use crate::wal::{WalReplayMode, WalSyncMode};
use crate::{Label, Result, TsinkError, Value};

mod codec;
mod logical;
mod replay;
mod segments;
mod series_index;
#[cfg(test)]
mod tests;

#[cfg(test)]
use codec::encode_series_definition;
use codec::{
    decode_samples_series_ids, decode_series_definition, merge_encoded_payload,
    split_encoded_payload,
};
pub(in crate::engine) use logical::LogicalWalWrite;
#[cfg(test)]
use replay::{replay_from_path, replay_from_path_with_mode};
pub use replay::{CommittedWalReplayStream, WalReplayStream};
#[cfg(test)]
use segments::{collect_wal_segment_files, scan_last_seq, segment_path};
use series_index::{CachedSeriesDefinitionFrame, CachedSeriesDefinitionIndex};

const WAL_FILE_NAME: &str = "wal.log";
const WAL_SEGMENT_FILE_PREFIX: &str = "wal-";
const WAL_SEGMENT_FILE_SUFFIX: &str = ".log";
const WAL_PUBLISHED_HIGHWATER_FILE_NAME: &str = "wal.published";
const WAL_PUBLISHED_HIGHWATER_TMP_FILE_NAME: &str = "wal.published.tmp";
const DEFAULT_WAL_BUFFER_SIZE: usize = 4096;
const DEFAULT_WAL_SEGMENT_MAX_BYTES: u64 = 64 * 1024 * 1024;
const FRAME_MAGIC: [u8; 4] = *b"TSFR";
const FRAME_HEADER_LEN: usize = 24;
const MAX_FRAME_PAYLOAD_BYTES: usize = 64 * 1024 * 1024;
const PUBLISHED_HIGHWATER_MAGIC: [u8; 4] = *b"TSHW";
const PUBLISHED_HIGHWATER_RECORD_LEN: usize = 24;

const FRAME_TYPE_SERIES_DEF: u8 = 1;
const FRAME_TYPE_SAMPLES: u8 = 2;

#[cfg(test)]
type WalAppendSyncHook = dyn Fn() -> Result<()> + Send + Sync + 'static;
#[cfg(test)]
type WalCachedSeriesDefinitionRebuildHook = dyn Fn() + Send + Sync + 'static;

#[derive(Debug, Clone)]
pub struct SeriesDefinitionFrame {
    pub series_id: SeriesId,
    pub metric: String,
    pub labels: Vec<Label>,
}

#[derive(Debug, Clone)]
pub struct SamplesBatchFrame {
    pub series_id: SeriesId,
    pub lane: ValueLane,
    pub ts_codec: TimestampCodecId,
    pub value_codec: ValueCodecId,
    pub point_count: u16,
    pub base_ts: i64,
    pub ts_payload: Vec<u8>,
    pub value_payload: Vec<u8>,
}

impl SamplesBatchFrame {
    pub fn from_points(
        series_id: SeriesId,
        lane: ValueLane,
        points: &[ChunkPoint],
    ) -> Result<Self> {
        let encoded = Encoder::encode_chunk_points(points, lane)?;
        let (ts_payload, value_payload) = split_encoded_payload(&encoded.payload)?;

        let base_ts = points.first().map(|point| point.ts).ok_or_else(|| {
            TsinkError::InvalidConfiguration("cannot WAL-encode empty batch".to_string())
        })?;

        let point_count = u16::try_from(points.len()).map_err(|_| {
            TsinkError::InvalidConfiguration("WAL batch exceeds u16 point count".to_string())
        })?;

        Ok(Self {
            series_id,
            lane,
            ts_codec: encoded.ts_codec,
            value_codec: encoded.value_codec,
            point_count,
            base_ts,
            ts_payload,
            value_payload,
        })
    }

    pub fn from_timestamp_value_refs(
        series_id: SeriesId,
        lane: ValueLane,
        points: &[(i64, &Value)],
    ) -> Result<Self> {
        let encoded = Encoder::encode_timestamp_value_refs(points, lane)?;
        let (ts_payload, value_payload) = split_encoded_payload(&encoded.payload)?;

        let base_ts = points.first().map(|point| point.0).ok_or_else(|| {
            TsinkError::InvalidConfiguration("cannot WAL-encode empty batch".to_string())
        })?;

        let point_count = u16::try_from(points.len()).map_err(|_| {
            TsinkError::InvalidConfiguration("WAL batch exceeds u16 point count".to_string())
        })?;

        Ok(Self {
            series_id,
            lane,
            ts_codec: encoded.ts_codec,
            value_codec: encoded.value_codec,
            point_count,
            base_ts,
            ts_payload,
            value_payload,
        })
    }

    pub fn decode_points(&self) -> Result<Vec<ChunkPoint>> {
        let payload = merge_encoded_payload(&self.ts_payload, &self.value_payload);
        let encoded = EncodedChunk {
            lane: self.lane,
            ts_codec: self.ts_codec,
            value_codec: self.value_codec,
            point_count: self.point_count as usize,
            payload,
        };

        let points = Encoder::decode_chunk_points(&encoded)?;
        if points.first().map(|point| point.ts) != Some(self.base_ts) {
            return Err(TsinkError::DataCorruption(
                "WAL batch base_ts does not match decoded timestamps".to_string(),
            ));
        }

        Ok(points)
    }
}

#[derive(Debug, Clone)]
pub enum ReplayFrame {
    SeriesDefinition(SeriesDefinitionFrame),
    Samples(Vec<SamplesBatchFrame>),
}

#[derive(Debug, Clone)]
pub struct CommittedWalWriteFrame {
    pub series_definitions: Vec<SeriesDefinitionFrame>,
    pub sample_batches: Vec<SamplesBatchFrame>,
    pub highwater: WalHighWatermark,
}

pub struct FramedWal {
    dir: PathBuf,
    path: Mutex<PathBuf>,
    published_highwater_path: PathBuf,
    published_highwater_tmp_path: PathBuf,
    writer: Mutex<BufWriter<File>>,
    active_segment: AtomicU64,
    active_segment_size_bytes: AtomicU64,
    next_seq: AtomicU64,
    total_size_bytes: AtomicU64,
    segment_count: AtomicU64,
    cached_series_definition_index: Mutex<CachedSeriesDefinitionIndex>,
    cached_series_definition_index_ready: Condvar,
    last_appended_highwater: Mutex<WalHighWatermark>,
    last_published_highwater: Mutex<WalHighWatermark>,
    last_durable_highwater: Mutex<WalHighWatermark>,
    configured_replay_mode: Mutex<WalReplayMode>,
    sync_mode: WalSyncMode,
    last_sync: Mutex<Instant>,
    segment_max_bytes: u64,
    #[cfg(test)]
    append_sync_hook: Mutex<Option<Arc<WalAppendSyncHook>>>,
    #[cfg(test)]
    cached_series_definition_rebuild_hook: Mutex<Option<Arc<WalCachedSeriesDefinitionRebuildHook>>>,
}