#![allow(dead_code)]
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use crate::storage::manifest::SegmentTier;
use crate::storage::{Segment, StorageError};
const DEFAULT_BUCKETS: &[u64] = &[
5, 10, 20, 50, 100, 200, 500, 1_000, 2_000, 5_000, 10_000, 20_000, 50_000, 100_000, 200_000,
500_000, 1_000_000, 2_000_000, 5_000_000, 10_000_000,
];
#[derive(Clone, Debug)]
pub struct Histogram {
inner: Arc<Mutex<HistogramState>>,
}
impl Histogram {
pub fn with_bounds(bounds: Vec<u64>) -> Self {
assert!(
!bounds.is_empty(),
"histogram must have at least one bucket"
);
let mut bounds = bounds;
bounds.sort_unstable();
bounds.dedup();
Self {
inner: Arc::new(Mutex::new(HistogramState::new(bounds))),
}
}
pub fn new() -> Self {
Self::with_bounds(DEFAULT_BUCKETS.to_vec())
}
pub fn observe_duration(&self, value: Duration) {
let micros = value.as_micros();
let micros = micros.min(u64::MAX as u128) as u64;
self.observe_value(micros);
}
pub fn observe_value(&self, value: u64) {
let mut state = self.inner.lock();
state.record(value);
}
pub fn snapshot(&self) -> HistogramSnapshot {
let state = self.inner.lock();
state.snapshot()
}
}
#[derive(Clone, Debug)]
pub struct HistogramSnapshot {
pub count: u64,
pub sum: u128,
pub min: u64,
pub max: u64,
pub buckets: Vec<(u64, u64)>,
}
impl HistogramSnapshot {
pub fn percentile(&self, quantile: f64) -> Option<u64> {
if !(0.0..=1.0).contains(&quantile) || self.count == 0 {
return None;
}
let rank = ((self.count as f64) * quantile).ceil().max(1.0) as u64;
let mut cumulative = 0u64;
for (bound, count) in &self.buckets {
cumulative += *count;
if cumulative >= rank {
return if *bound == u64::MAX {
Some(self.max)
} else {
Some(*bound)
};
}
}
Some(self.max)
}
}
#[derive(Clone, Debug)]
struct HistogramState {
bounds: Vec<u64>,
counts: Vec<u64>,
overflow: u64,
count: u64,
sum: u128,
min: u64,
max: u64,
}
impl HistogramState {
fn new(bounds: Vec<u64>) -> Self {
let count = bounds.len();
Self {
bounds,
counts: vec![0; count],
overflow: 0,
count: 0,
sum: 0,
min: u64::MAX,
max: 0,
}
}
fn record(&mut self, value: u64) {
self.count += 1;
self.sum += value as u128;
self.min = self.min.min(value);
self.max = self.max.max(value);
let idx = match self.bounds.binary_search(&value) {
Ok(idx) => idx,
Err(idx) => idx,
};
if let Some(bucket) = self.counts.get_mut(idx) {
*bucket += 1;
} else {
self.overflow += 1;
}
}
fn snapshot(&self) -> HistogramSnapshot {
let mut buckets: Vec<(u64, u64)> = self
.bounds
.iter()
.copied()
.zip(self.counts.iter().copied())
.collect();
buckets.push((u64::MAX, self.overflow));
HistogramSnapshot {
count: self.count,
sum: self.sum,
min: if self.count == 0 { 0 } else { self.min },
max: if self.count == 0 { 0 } else { self.max },
buckets,
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub enum OperationKind {
WalAppend,
MemtableApply,
Flush,
Query,
Compaction,
Scrub,
}
impl OperationKind {
pub const fn all() -> &'static [OperationKind] {
&[
OperationKind::WalAppend,
OperationKind::MemtableApply,
OperationKind::Flush,
OperationKind::Query,
OperationKind::Compaction,
OperationKind::Scrub,
]
}
}
#[derive(Clone, Debug)]
pub struct OperationObserver {
histograms: Arc<HashMap<OperationKind, Histogram>>,
}
impl OperationObserver {
pub fn new() -> Self {
let mut map = HashMap::new();
for kind in OperationKind::all() {
map.insert(*kind, Histogram::new());
}
Self {
histograms: Arc::new(map),
}
}
pub fn record_duration(&self, kind: OperationKind, duration: Duration) {
if let Some(hist) = self.histograms.get(&kind) {
hist.observe_duration(duration);
}
}
pub fn record_value(&self, kind: OperationKind, value: u64) {
if let Some(hist) = self.histograms.get(&kind) {
hist.observe_value(value);
}
}
pub fn histogram(&self, kind: OperationKind) -> Option<HistogramSnapshot> {
self.histograms.get(&kind).map(Histogram::snapshot)
}
pub fn timer(&self, kind: OperationKind) -> OperationTimer {
OperationTimer {
observer: self.clone(),
kind,
start: Instant::now(),
}
}
}
pub struct OperationTimer {
observer: OperationObserver,
kind: OperationKind,
start: Instant,
}
impl Drop for OperationTimer {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
self.observer.record_duration(self.kind, elapsed);
}
}
#[derive(Clone, Debug)]
pub struct CompactionDebtGauge {
inner: Arc<HashMap<SegmentTier, AtomicI64>>,
}
impl CompactionDebtGauge {
pub fn new() -> Self {
let mut map = HashMap::new();
for tier in SegmentTier::all() {
map.insert(tier, AtomicI64::new(0));
}
Self {
inner: Arc::new(map),
}
}
pub fn set_debt(&self, tier: SegmentTier, bytes: i64) {
if let Some(cell) = self.inner.get(&tier) {
cell.store(bytes, Ordering::Relaxed);
}
}
pub fn add_debt(&self, tier: SegmentTier, delta: i64) {
if let Some(cell) = self.inner.get(&tier) {
cell.fetch_add(delta, Ordering::Relaxed);
}
}
pub fn debt(&self, tier: SegmentTier) -> i64 {
self.inner
.get(&tier)
.map(|cell| cell.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn total_debt(&self) -> i64 {
self.inner
.values()
.map(|cell| cell.load(Ordering::Relaxed))
.sum()
}
}
#[derive(Clone, Debug)]
pub struct ScrubReport {
pub segment_id: String,
pub pages_checked: usize,
pub corrupted_pages: Vec<usize>,
pub duration: Duration,
pub error: Option<String>,
}
pub fn scrub_segment(segment: &dyn Segment) -> ScrubReport {
let start = Instant::now();
let mut corrupted = Vec::new();
let mut pages_checked = 0usize;
let mut error = None;
match segment.open_reader() {
Ok(reader) => {
let page_count = reader.page_count();
for index in 0..page_count {
match reader.read_page(index) {
Ok(_) => pages_checked += 1,
Err(StorageError::ChecksumMismatch { page, .. }) => {
pages_checked += 1;
corrupted.push(page);
}
Err(other) => {
error = Some(other.to_string());
break;
}
}
}
}
Err(err) => {
error = Some(err.to_string());
}
}
ScrubReport {
segment_id: segment.metadata().id.clone(),
pages_checked,
corrupted_pages: corrupted,
duration: start.elapsed(),
error,
}
}
#[derive(Clone, Debug)]
pub struct Scrubber {
metrics: OperationObserver,
}
impl Scrubber {
pub fn new(metrics: OperationObserver) -> Self {
Self { metrics }
}
pub fn scrub(&self, segment: &dyn Segment) -> ScrubReport {
let timer = self.metrics.timer(OperationKind::Scrub);
let report = scrub_segment(segment);
drop(timer);
report
}
pub fn scrub_many<'a>(
&self,
segments: impl IntoIterator<Item = &'a dyn Segment>,
) -> Vec<ScrubReport> {
segments.into_iter().map(|seg| self.scrub(seg)).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn histogram_records_percentiles() {
let hist = Histogram::new();
for value in [1_u64, 2, 3, 4, 5] {
hist.observe_value(value);
}
let snapshot = hist.snapshot();
assert_eq!(snapshot.count, 5);
assert_eq!(snapshot.percentile(0.5), Some(5));
}
#[test]
fn debt_gauge_accumulates() {
let gauge = CompactionDebtGauge::new();
gauge.set_debt(SegmentTier::Hot, 10);
gauge.add_debt(SegmentTier::Warm, 5);
assert_eq!(gauge.debt(SegmentTier::Hot), 10);
assert_eq!(gauge.total_debt(), 15);
}
}