mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
#![allow(dead_code)]
//! Instrumentation helpers for MindB.
//!
//! The observe module provides lightweight building blocks for capturing
//! operation latencies, compaction back-pressure, and corruption scrubbing
//! results without pulling in heavyweight metric dependencies. The data
//! structures are intentionally simple yet thread-safe so they can be used from
//! the async runtime as well as background threads.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{Duration, Instant};

use parking_lot::Mutex;

use crate::storage::manifest::SegmentTier;
use crate::storage::{Segment, StorageError};

/// Default histogram bucket boundaries expressed in microseconds.
const DEFAULT_BUCKETS: &[u64] = &[
    5, 10, 20, 50, 100, 200, 500, 1_000, 2_000, 5_000, 10_000, 20_000, 50_000, 100_000, 200_000,
    500_000, 1_000_000, 2_000_000, 5_000_000, 10_000_000,
];

#[derive(Clone, Debug)]
pub struct Histogram {
    inner: Arc<Mutex<HistogramState>>,
}

impl Histogram {
    pub fn with_bounds(bounds: Vec<u64>) -> Self {
        assert!(
            !bounds.is_empty(),
            "histogram must have at least one bucket"
        );
        let mut bounds = bounds;
        bounds.sort_unstable();
        bounds.dedup();
        Self {
            inner: Arc::new(Mutex::new(HistogramState::new(bounds))),
        }
    }

    pub fn new() -> Self {
        Self::with_bounds(DEFAULT_BUCKETS.to_vec())
    }

    pub fn observe_duration(&self, value: Duration) {
        let micros = value.as_micros();
        let micros = micros.min(u64::MAX as u128) as u64;
        self.observe_value(micros);
    }

    pub fn observe_value(&self, value: u64) {
        let mut state = self.inner.lock();
        state.record(value);
    }

    pub fn snapshot(&self) -> HistogramSnapshot {
        let state = self.inner.lock();
        state.snapshot()
    }
}

#[derive(Clone, Debug)]
pub struct HistogramSnapshot {
    pub count: u64,
    pub sum: u128,
    pub min: u64,
    pub max: u64,
    pub buckets: Vec<(u64, u64)>,
}

impl HistogramSnapshot {
    pub fn percentile(&self, quantile: f64) -> Option<u64> {
        if !(0.0..=1.0).contains(&quantile) || self.count == 0 {
            return None;
        }
        let rank = ((self.count as f64) * quantile).ceil().max(1.0) as u64;
        let mut cumulative = 0u64;
        for (bound, count) in &self.buckets {
            cumulative += *count;
            if cumulative >= rank {
                return if *bound == u64::MAX {
                    Some(self.max)
                } else {
                    Some(*bound)
                };
            }
        }
        Some(self.max)
    }
}

#[derive(Clone, Debug)]
struct HistogramState {
    bounds: Vec<u64>,
    counts: Vec<u64>,
    overflow: u64,
    count: u64,
    sum: u128,
    min: u64,
    max: u64,
}

impl HistogramState {
    fn new(bounds: Vec<u64>) -> Self {
        let count = bounds.len();
        Self {
            bounds,
            counts: vec![0; count],
            overflow: 0,
            count: 0,
            sum: 0,
            min: u64::MAX,
            max: 0,
        }
    }

    fn record(&mut self, value: u64) {
        self.count += 1;
        self.sum += value as u128;
        self.min = self.min.min(value);
        self.max = self.max.max(value);

        let idx = match self.bounds.binary_search(&value) {
            Ok(idx) => idx,
            Err(idx) => idx,
        };
        if let Some(bucket) = self.counts.get_mut(idx) {
            *bucket += 1;
        } else {
            self.overflow += 1;
        }
    }

    fn snapshot(&self) -> HistogramSnapshot {
        let mut buckets: Vec<(u64, u64)> = self
            .bounds
            .iter()
            .copied()
            .zip(self.counts.iter().copied())
            .collect();
        buckets.push((u64::MAX, self.overflow));

        HistogramSnapshot {
            count: self.count,
            sum: self.sum,
            min: if self.count == 0 { 0 } else { self.min },
            max: if self.count == 0 { 0 } else { self.max },
            buckets,
        }
    }
}

/// Different high level operations exposed by the system.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub enum OperationKind {
    WalAppend,
    MemtableApply,
    Flush,
    Query,
    Compaction,
    Scrub,
}

impl OperationKind {
    pub const fn all() -> &'static [OperationKind] {
        &[
            OperationKind::WalAppend,
            OperationKind::MemtableApply,
            OperationKind::Flush,
            OperationKind::Query,
            OperationKind::Compaction,
            OperationKind::Scrub,
        ]
    }
}

/// Central place where execution time metrics are captured.
#[derive(Clone, Debug)]
pub struct OperationObserver {
    histograms: Arc<HashMap<OperationKind, Histogram>>,
}

impl OperationObserver {
    pub fn new() -> Self {
        let mut map = HashMap::new();
        for kind in OperationKind::all() {
            map.insert(*kind, Histogram::new());
        }
        Self {
            histograms: Arc::new(map),
        }
    }

    pub fn record_duration(&self, kind: OperationKind, duration: Duration) {
        if let Some(hist) = self.histograms.get(&kind) {
            hist.observe_duration(duration);
        }
    }

    pub fn record_value(&self, kind: OperationKind, value: u64) {
        if let Some(hist) = self.histograms.get(&kind) {
            hist.observe_value(value);
        }
    }

    pub fn histogram(&self, kind: OperationKind) -> Option<HistogramSnapshot> {
        self.histograms.get(&kind).map(Histogram::snapshot)
    }

    pub fn timer(&self, kind: OperationKind) -> OperationTimer {
        OperationTimer {
            observer: self.clone(),
            kind,
            start: Instant::now(),
        }
    }
}

/// RAII helper that records a duration in the associated histogram when dropped.
pub struct OperationTimer {
    observer: OperationObserver,
    kind: OperationKind,
    start: Instant,
}

impl Drop for OperationTimer {
    fn drop(&mut self) {
        let elapsed = self.start.elapsed();
        self.observer.record_duration(self.kind, elapsed);
    }
}

/// Tracks how many bytes of compaction work are still outstanding for each tier.
#[derive(Clone, Debug)]
pub struct CompactionDebtGauge {
    inner: Arc<HashMap<SegmentTier, AtomicI64>>,
}

impl CompactionDebtGauge {
    pub fn new() -> Self {
        let mut map = HashMap::new();
        for tier in SegmentTier::all() {
            map.insert(tier, AtomicI64::new(0));
        }
        Self {
            inner: Arc::new(map),
        }
    }

    pub fn set_debt(&self, tier: SegmentTier, bytes: i64) {
        if let Some(cell) = self.inner.get(&tier) {
            cell.store(bytes, Ordering::Relaxed);
        }
    }

    pub fn add_debt(&self, tier: SegmentTier, delta: i64) {
        if let Some(cell) = self.inner.get(&tier) {
            cell.fetch_add(delta, Ordering::Relaxed);
        }
    }

    pub fn debt(&self, tier: SegmentTier) -> i64 {
        self.inner
            .get(&tier)
            .map(|cell| cell.load(Ordering::Relaxed))
            .unwrap_or(0)
    }

    pub fn total_debt(&self) -> i64 {
        self.inner
            .values()
            .map(|cell| cell.load(Ordering::Relaxed))
            .sum()
    }
}

/// Result of running the corruption scrubber across a segment.
#[derive(Clone, Debug)]
pub struct ScrubReport {
    pub segment_id: String,
    pub pages_checked: usize,
    pub corrupted_pages: Vec<usize>,
    pub duration: Duration,
    pub error: Option<String>,
}

/// Scrubs a single segment for checksum mismatches. All failures are recorded in
/// the returned report so that the caller can decide whether they are fatal.
pub fn scrub_segment(segment: &dyn Segment) -> ScrubReport {
    let start = Instant::now();
    let mut corrupted = Vec::new();
    let mut pages_checked = 0usize;
    let mut error = None;

    match segment.open_reader() {
        Ok(reader) => {
            let page_count = reader.page_count();
            for index in 0..page_count {
                match reader.read_page(index) {
                    Ok(_) => pages_checked += 1,
                    Err(StorageError::ChecksumMismatch { page, .. }) => {
                        pages_checked += 1;
                        corrupted.push(page);
                    }
                    Err(other) => {
                        error = Some(other.to_string());
                        break;
                    }
                }
            }
        }
        Err(err) => {
            error = Some(err.to_string());
        }
    }

    ScrubReport {
        segment_id: segment.metadata().id.clone(),
        pages_checked,
        corrupted_pages: corrupted,
        duration: start.elapsed(),
        error,
    }
}

/// Helper that ties the histogram instrumentation with the scrubber.
#[derive(Clone, Debug)]
pub struct Scrubber {
    metrics: OperationObserver,
}

impl Scrubber {
    pub fn new(metrics: OperationObserver) -> Self {
        Self { metrics }
    }

    pub fn scrub(&self, segment: &dyn Segment) -> ScrubReport {
        let timer = self.metrics.timer(OperationKind::Scrub);
        let report = scrub_segment(segment);
        drop(timer);
        report
    }

    pub fn scrub_many<'a>(
        &self,
        segments: impl IntoIterator<Item = &'a dyn Segment>,
    ) -> Vec<ScrubReport> {
        segments.into_iter().map(|seg| self.scrub(seg)).collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn histogram_records_percentiles() {
        let hist = Histogram::new();
        for value in [1_u64, 2, 3, 4, 5] {
            hist.observe_value(value);
        }
        let snapshot = hist.snapshot();
        assert_eq!(snapshot.count, 5);
        assert_eq!(snapshot.percentile(0.5), Some(5));
    }

    #[test]
    fn debt_gauge_accumulates() {
        let gauge = CompactionDebtGauge::new();
        gauge.set_debt(SegmentTier::Hot, 10);
        gauge.add_debt(SegmentTier::Warm, 5);
        assert_eq!(gauge.debt(SegmentTier::Hot), 10);
        assert_eq!(gauge.total_debt(), 15);
    }
}