tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
use std::path::PathBuf;
use thiserror::Error;

pub type Result<T> = std::result::Result<T, TsinkError>;

#[derive(Error, Debug)]
pub enum TsinkError {
    #[error("No data points found for metric '{metric}' in range [{start}, {end})")]
    NoDataPoints {
        metric: String,
        start: i64,
        end: i64,
    },

    #[error("Invalid timestamp range: start {start} >= end {end}")]
    InvalidTimeRange { start: i64, end: i64 },

    #[error("Metric name is required")]
    MetricRequired,

    #[error("Invalid metric name: {0}")]
    InvalidMetricName(String),

    #[error("Invalid label: {0}")]
    InvalidLabel(String),

    #[error("Partition not found for timestamp {timestamp}")]
    PartitionNotFound { timestamp: i64 },

    #[error("Invalid partition ID: {id}")]
    InvalidPartition { id: String },

    #[error("Cannot insert rows into read-only partition at {path:?}")]
    ReadOnlyPartition { path: PathBuf },

    #[error("Write timeout exceeded after {timeout_ms}ms with {workers} concurrent writers")]
    WriteTimeout { timeout_ms: u64, workers: usize },

    #[error("Memory budget exceeded: budget {budget} bytes, required at least {required} bytes")]
    MemoryBudgetExceeded { budget: usize, required: usize },

    #[error(
        "Series cardinality limit exceeded: limit {limit}, current {current}, requested {requested}"
    )]
    CardinalityLimitExceeded {
        limit: usize,
        current: usize,
        requested: usize,
    },

    #[error("WAL size limit exceeded: limit {limit} bytes, required at least {required} bytes")]
    WalSizeLimitExceeded { limit: u64, required: u64 },

    #[error("Storage is shutting down")]
    StorageShuttingDown,

    #[error("Storage already closed")]
    StorageClosed,

    #[error("Invalid configuration: {0}")]
    InvalidConfiguration(String),

    #[error("Unsupported operation '{operation}': {reason}")]
    UnsupportedOperation {
        operation: &'static str,
        reason: String,
    },

    #[error("Data corruption detected: {0}")]
    DataCorruption(String),

    #[error("Insufficient disk space: required {required} bytes, available {available} bytes")]
    InsufficientDiskSpace { required: u64, available: u64 },

    #[error("IO error at path {path:?}: {source}")]
    IoWithPath {
        path: PathBuf,
        #[source]
        source: std::io::Error,
    },

    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),

    #[error("JSON serialization error: {0}")]
    Json(#[from] serde_json::Error),

    #[error("Bincode serialization error: {0}")]
    Bincode(#[from] bincode::Error),

    #[error("UTF-8 conversion error: {0}")]
    Utf8(#[from] std::string::FromUtf8Error),

    #[error("Lock poisoned for resource: {resource}")]
    LockPoisoned { resource: String },

    #[error("Channel send error for {channel}")]
    ChannelSend { channel: String },

    #[error("Channel receive error for {channel}")]
    ChannelReceive { channel: String },

    #[error("Channel timeout after {timeout_ms}ms")]
    ChannelTimeout { timeout_ms: u64 },

    #[error("Memory map error at {path:?}: {details}")]
    MemoryMap { path: PathBuf, details: String },

    #[error("Invalid offset {offset} exceeds maximum {max}")]
    InvalidOffset { offset: u64, max: u64 },

    #[error("WAL error: {operation} failed: {details}")]
    Wal { operation: String, details: String },

    #[error("Compression error: {0}")]
    Compression(String),

    #[error("Checksum mismatch: expected {expected:?}, got {actual:?}")]
    ChecksumMismatch { expected: Vec<u8>, actual: Vec<u8> },

    #[error("Data point with timestamp {timestamp} is outside the retention window")]
    OutOfRetention { timestamp: i64 },

    #[error(
        "Late write at timestamp {timestamp} would open partition {partition_id} beyond the active-head limit {max_active_partition_heads_per_series} (active range {oldest_active_partition_id}..={newest_active_partition_id})"
    )]
    LateWritePartitionFanoutExceeded {
        timestamp: i64,
        partition_id: i64,
        max_active_partition_heads_per_series: usize,
        oldest_active_partition_id: i64,
        newest_active_partition_id: i64,
    },

    #[error("Unsupported aggregation '{aggregation}' for value type '{value_type}'")]
    UnsupportedAggregation {
        aggregation: String,
        value_type: String,
    },

    #[error("Value type mismatch: expected {expected}, found {actual}")]
    ValueTypeMismatch { expected: String, actual: String },

    #[error("Codec error: {0}")]
    Codec(String),

    #[error("Other error: {0}")]
    Other(String),
}

impl<T> From<std::sync::PoisonError<T>> for TsinkError {
    fn from(err: std::sync::PoisonError<T>) -> Self {
        TsinkError::LockPoisoned {
            resource: format!("{:?}", err),
        }
    }
}

impl<T> From<crossbeam_channel::SendError<T>> for TsinkError {
    fn from(err: crossbeam_channel::SendError<T>) -> Self {
        TsinkError::ChannelSend {
            channel: format!("{:?}", err),
        }
    }
}

impl From<crossbeam_channel::RecvError> for TsinkError {
    fn from(err: crossbeam_channel::RecvError) -> Self {
        TsinkError::ChannelReceive {
            channel: format!("{:?}", err),
        }
    }
}

impl From<crossbeam_channel::RecvTimeoutError> for TsinkError {
    fn from(e: crossbeam_channel::RecvTimeoutError) -> Self {
        match e {
            crossbeam_channel::RecvTimeoutError::Timeout => {
                TsinkError::ChannelTimeout { timeout_ms: 0 }
            }
            crossbeam_channel::RecvTimeoutError::Disconnected => TsinkError::ChannelReceive {
                channel: "timeout: channel disconnected".to_string(),
            },
        }
    }
}

#[cfg(test)]
mod tests {
    use super::TsinkError;
    use crossbeam_channel::RecvTimeoutError;
    use std::sync::{Arc, Mutex};
    use std::thread;

    #[test]
    fn recv_timeout_error_timeout_maps_to_channel_timeout() {
        let err: TsinkError = RecvTimeoutError::Timeout.into();
        assert!(matches!(err, TsinkError::ChannelTimeout { timeout_ms: 0 }));
    }

    #[test]
    fn recv_timeout_error_disconnected_maps_to_channel_receive() {
        let err: TsinkError = RecvTimeoutError::Disconnected.into();
        assert!(matches!(err, TsinkError::ChannelReceive { .. }));
    }

    #[test]
    fn recv_error_maps_to_channel_receive() {
        let (tx, rx) = crossbeam_channel::bounded::<u8>(1);
        drop(tx);

        let err: TsinkError = rx.recv().unwrap_err().into();
        assert!(matches!(err, TsinkError::ChannelReceive { .. }));
    }

    #[test]
    fn send_error_maps_to_channel_send() {
        let (tx, rx) = crossbeam_channel::bounded::<u8>(1);
        drop(rx);

        let err: TsinkError = tx.send(1).unwrap_err().into();
        assert!(matches!(err, TsinkError::ChannelSend { .. }));
    }

    #[test]
    fn poison_error_maps_to_lock_poisoned() {
        let shared = Arc::new(Mutex::new(1usize));
        let shared_for_thread = Arc::clone(&shared);

        let _ = thread::spawn(move || {
            let _guard = shared_for_thread.lock().unwrap();
            panic!("intentional panic to poison mutex");
        })
        .join();

        let poison = shared.lock().unwrap_err();
        let err: TsinkError = poison.into();
        assert!(matches!(err, TsinkError::LockPoisoned { .. }));
    }
}