use std::collections::HashMap;
use std::fs;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
use super::{execution, finalize_pending_compaction_replacements, Compactor};
use crate::engine::chunk::{Chunk, ChunkHeader, ChunkPoint, ValueLane};
use crate::engine::encoder::Encoder;
use crate::engine::segment::{
load_segments, load_segments_for_level, SegmentWriter, WalHighWatermark,
};
use crate::engine::series::{SeriesRegistry, SeriesValueFamily};
use crate::engine::tombstone::{persist_tombstones, TombstoneRange, TOMBSTONES_FILE_NAME};
use crate::{
HistogramBucketSpan, HistogramCount, HistogramResetHint, Label, NativeHistogram, Value,
};
fn sample_histogram() -> NativeHistogram {
NativeHistogram {
count: Some(HistogramCount::Int(42)),
sum: 17.5,
schema: 1,
zero_threshold: 0.001,
zero_count: Some(HistogramCount::Int(7)),
negative_spans: vec![HistogramBucketSpan {
offset: -1,
length: 1,
}],
negative_deltas: vec![2],
negative_counts: vec![],
positive_spans: vec![HistogramBucketSpan {
offset: 0,
length: 2,
}],
positive_deltas: vec![3, 1],
positive_counts: vec![],
reset_hint: HistogramResetHint::No,
custom_values: vec![0.25, 0.5],
}
}
#[test]
fn compacts_overlapping_l0_segments_into_l1() {
let temp_dir = TempDir::new().unwrap();
let registry = SeriesRegistry::new();
let series_id = registry
.resolve_or_insert("cpu", &[Label::new("host", "a")])
.unwrap()
.series_id;
let first_chunk = make_numeric_chunk(series_id, &[(10, 1.0), (20, 2.0)]);
let second_chunk = make_numeric_chunk(series_id, &[(15, 3.0), (30, 4.0)]);
let mut seg1_chunks = HashMap::new();
seg1_chunks.insert(series_id, vec![first_chunk]);
let mut seg2_chunks = HashMap::new();
seg2_chunks.insert(series_id, vec![second_chunk]);
SegmentWriter::new(temp_dir.path(), 0, 1)
.unwrap()
.write_segment(®istry, &seg1_chunks)
.unwrap();
SegmentWriter::new(temp_dir.path(), 0, 2)
.unwrap()
.write_segment(®istry, &seg2_chunks)
.unwrap();
let compactor = Compactor::new(temp_dir.path(), 8);
compactor.compact_once().unwrap();
let l0 = load_segments_for_level(temp_dir.path(), 0).unwrap();
let l1 = load_segments_for_level(temp_dir.path(), 1).unwrap();
assert!(l0.is_empty());
assert_eq!(l1.len(), 1);
let loaded = load_segments(temp_dir.path()).unwrap();
let chunks = loaded.chunks_by_series.get(&series_id).unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].header.point_count, 4);
let decoded = execution::decode_chunk_points_for_compaction(&chunks[0]).unwrap();
assert_eq!(decoded[0].ts, 10);
assert_eq!(decoded[3].ts, 30);
}
#[test]
fn compaction_preserves_histogram_payloads() {
let temp_dir = TempDir::new().unwrap();
let registry = SeriesRegistry::new();
let series_id = registry
.resolve_or_insert("rpc_duration_seconds", &[Label::new("job", "backend")])
.unwrap()
.series_id;
let first_chunk = make_histogram_chunk(series_id, &[10, 20]);
let second_chunk = make_histogram_chunk(series_id, &[30, 40]);
let mut seg1_chunks = HashMap::new();
seg1_chunks.insert(series_id, vec![first_chunk]);
let mut seg2_chunks = HashMap::new();
seg2_chunks.insert(series_id, vec![second_chunk]);
SegmentWriter::new(temp_dir.path(), 0, 1)
.unwrap()
.write_segment(®istry, &seg1_chunks)
.unwrap();
SegmentWriter::new(temp_dir.path(), 0, 2)
.unwrap()
.write_segment(®istry, &seg2_chunks)
.unwrap();
let compactor = Compactor::new(temp_dir.path(), 8);
compactor.compact_once().unwrap();
let loaded = load_segments(temp_dir.path()).unwrap();
let chunks = loaded.chunks_by_series.get(&series_id).unwrap();
let decoded = chunks
.iter()
.flat_map(|chunk| execution::decode_chunk_points_for_compaction(chunk).unwrap())
.collect::<Vec<_>>();
assert_eq!(decoded.len(), 4);
for (point, ts) in decoded.iter().zip([10, 20, 30, 40]) {
assert_eq!(point.ts, ts);
assert_eq!(point.value, Value::from(sample_histogram()));
}
}
#[test]
fn compactor_uses_shared_segment_id_allocator() {
let temp_dir = TempDir::new().unwrap();
let registry = SeriesRegistry::new();
let series_id = registry
.resolve_or_insert("cpu", &[Label::new("host", "a")])
.unwrap()
.series_id;
let first_chunk = make_numeric_chunk(series_id, &[(10, 1.0), (20, 2.0)]);
let second_chunk = make_numeric_chunk(series_id, &[(15, 3.0), (30, 4.0)]);
let mut seg1_chunks = HashMap::new();
seg1_chunks.insert(series_id, vec![first_chunk]);
let mut seg2_chunks = HashMap::new();
seg2_chunks.insert(series_id, vec![second_chunk]);
SegmentWriter::new(temp_dir.path(), 0, 1)
.unwrap()
.write_segment(®istry, &seg1_chunks)
.unwrap();
SegmentWriter::new(temp_dir.path(), 0, 2)
.unwrap()
.write_segment(®istry, &seg2_chunks)
.unwrap();
let next_segment_id = Arc::new(AtomicU64::new(100));
let compactor =
Compactor::new_with_segment_id_allocator(temp_dir.path(), 8, Arc::clone(&next_segment_id));
compactor.compact_once().unwrap();
let l1 = load_segments_for_level(temp_dir.path(), 1).unwrap();
assert_eq!(l1.len(), 1);
assert_eq!(l1[0].manifest.segment_id, 100);
assert_eq!(next_segment_id.load(Ordering::SeqCst), 101);
}
#[test]
fn compactor_limits_source_window_per_pass() {
let temp_dir = TempDir::new().unwrap();
let registry = SeriesRegistry::new();
let series_id = registry
.resolve_or_insert("cpu", &[Label::new("host", "a")])
.unwrap()
.series_id;
for segment_id in 1..=6 {
let mut chunks = HashMap::new();
chunks.insert(
series_id,
vec![make_numeric_chunk(
series_id,
&[(segment_id as i64, segment_id as f64)],
)],
);
SegmentWriter::new(temp_dir.path(), 0, segment_id)
.unwrap()
.write_segment(®istry, &chunks)
.unwrap();
}
let compactor = Compactor::new(temp_dir.path(), 8);
compactor.compact_once().unwrap();
let l0 = load_segments_for_level(temp_dir.path(), 0).unwrap();
let l1 = load_segments_for_level(temp_dir.path(), 1).unwrap();
assert_eq!(l0.len(), 2);
assert_eq!(l1.len(), 1);
assert_eq!(
l0.iter()
.map(|segment| segment.manifest.segment_id)
.collect::<Vec<_>>(),
vec![5, 6]
);
}
#[test]
fn compactor_splits_large_output_into_multiple_segments() {
let temp_dir = TempDir::new().unwrap();
let registry = SeriesRegistry::new();
let series_id = registry
.resolve_or_insert("cpu", &[Label::new("host", "a")])
.unwrap()
.series_id;
for segment_id in 1..=4 {
let start = (segment_id as i64 - 1) * 300;
let points = (start..start + 300)
.map(|ts| (ts, ts as f64))
.collect::<Vec<_>>();
let mut chunks = HashMap::new();
chunks.insert(series_id, vec![make_numeric_chunk(series_id, &points)]);
SegmentWriter::new(temp_dir.path(), 0, segment_id)
.unwrap()
.write_segment(®istry, &chunks)
.unwrap();
}
let compactor = Compactor::new(temp_dir.path(), 2);
compactor.compact_once().unwrap();
let l0 = load_segments_for_level(temp_dir.path(), 0).unwrap();
let l1 = load_segments_for_level(temp_dir.path(), 1).unwrap();
assert!(l0.is_empty());
assert!(l1.len() >= 2);
assert!(l1.iter().all(|segment| {
segment.manifest.point_count <= 2 * super::DEFAULT_OUTPUT_SEGMENT_CHUNK_MULTIPLIER
}));
let loaded = load_segments(temp_dir.path()).unwrap();
let chunks = loaded.chunks_by_series.get(&series_id).unwrap();
let total_points = chunks
.iter()
.map(|chunk| chunk.header.point_count as usize)
.sum::<usize>();
assert_eq!(total_points, 1200);
}
#[test]
fn compactor_purges_tombstoned_points() {
let temp_dir = TempDir::new().unwrap();
let registry = SeriesRegistry::new();
let series_id = registry
.resolve_or_insert("cpu", &[Label::new("host", "a")])
.unwrap()
.series_id;
let mut seg1_chunks = HashMap::new();
seg1_chunks.insert(
series_id,
vec![make_numeric_chunk(series_id, &[(10, 1.0), (20, 2.0)])],
);
SegmentWriter::new(temp_dir.path(), 0, 1)
.unwrap()
.write_segment(®istry, &seg1_chunks)
.unwrap();
let mut seg2_chunks = HashMap::new();
seg2_chunks.insert(
series_id,
vec![make_numeric_chunk(series_id, &[(15, 3.0), (30, 4.0)])],
);
SegmentWriter::new(temp_dir.path(), 0, 2)
.unwrap()
.write_segment(®istry, &seg2_chunks)
.unwrap();
let mut tombstones = HashMap::new();
tombstones.insert(series_id, vec![TombstoneRange { start: 15, end: 21 }]);
persist_tombstones(&temp_dir.path().join(TOMBSTONES_FILE_NAME), &tombstones).unwrap();
let compactor = Compactor::new(temp_dir.path(), 8);
assert!(compactor.compact_once().unwrap());
let loaded = load_segments(temp_dir.path()).unwrap();
let chunks = loaded.chunks_by_series.get(&series_id).unwrap();
let decoded = execution::decode_chunk_points_for_compaction(&chunks[0]).unwrap();
let timestamps = decoded
.into_iter()
.map(|point| point.ts)
.collect::<Vec<_>>();
assert_eq!(timestamps, vec![10, 30]);
}
#[test]
fn compaction_fails_when_any_source_segment_is_corrupted() {
let temp_dir = TempDir::new().unwrap();
let registry = SeriesRegistry::new();
let series_id = registry
.resolve_or_insert("cpu", &[Label::new("host", "a")])
.unwrap()
.series_id;
let first_writer = SegmentWriter::new(temp_dir.path(), 0, 1).unwrap();
let mut first_chunks = HashMap::new();
first_chunks.insert(
series_id,
vec![make_numeric_chunk(series_id, &[(10, 1.0), (20, 2.0)])],
);
first_writer
.write_segment(®istry, &first_chunks)
.unwrap();
let second_writer = SegmentWriter::new(temp_dir.path(), 0, 2).unwrap();
let mut second_chunks = HashMap::new();
second_chunks.insert(
series_id,
vec![make_numeric_chunk(series_id, &[(15, 3.0), (30, 4.0)])],
);
second_writer
.write_segment(®istry, &second_chunks)
.unwrap();
let corrupt_root = first_writer.layout().root.clone();
let mut manifest_bytes = fs::read(first_writer.layout().manifest_path.clone()).unwrap();
manifest_bytes[0] ^= 0xff;
fs::write(first_writer.layout().manifest_path.clone(), manifest_bytes).unwrap();
let compactor = Compactor::new(temp_dir.path(), 8);
let err = compactor.compact_once().unwrap_err();
let message = err.to_string();
assert!(
message.contains("failed compaction validation"),
"unexpected error: {message}"
);
assert!(
message.contains("manifest crc32 mismatch"),
"unexpected error: {message}"
);
assert!(
message.contains(&corrupt_root.display().to_string()),
"unexpected error: {message}"
);
assert!(
corrupt_root.exists(),
"corrupt source root should stay in place"
);
assert!(
second_writer.layout().root.exists(),
"healthy source root should not be compacted away"
);
assert!(
load_segments_for_level(temp_dir.path(), 1)
.unwrap()
.is_empty(),
"compaction must not publish replacement output when any source root is corrupt"
);
}
#[test]
fn finalize_pending_replacements_removes_marked_source_segments() {
let temp_dir = TempDir::new().unwrap();
let registry = SeriesRegistry::new();
let series_id = registry
.resolve_or_insert("cpu", &[Label::new("host", "a")])
.unwrap()
.series_id;
let mut l0_chunks = HashMap::new();
l0_chunks.insert(series_id, vec![make_numeric_chunk(series_id, &[(1, 1.0)])]);
SegmentWriter::new(temp_dir.path(), 0, 1)
.unwrap()
.write_segment(®istry, &l0_chunks)
.unwrap();
let mut l1_chunks = HashMap::new();
l1_chunks.insert(series_id, vec![make_numeric_chunk(series_id, &[(1, 2.0)])]);
SegmentWriter::new(temp_dir.path(), 1, 2)
.unwrap()
.write_segment(®istry, &l1_chunks)
.unwrap();
let l0 = load_segments_for_level(temp_dir.path(), 0).unwrap();
let l1 = load_segments_for_level(temp_dir.path(), 1).unwrap();
assert_eq!(l0.len(), 1);
assert_eq!(l1.len(), 1);
let source_root = l0[0].root.clone();
let output_root = l1[0].root.clone();
let marker_path = execution::write_compaction_replacement_marker(
temp_dir.path(),
std::slice::from_ref(&source_root),
std::slice::from_ref(&output_root),
)
.unwrap();
assert!(marker_path.exists());
finalize_pending_compaction_replacements(temp_dir.path()).unwrap();
assert!(!source_root.exists(), "source segment should be removed");
assert!(
output_root.exists(),
"replacement output should be preserved"
);
assert!(
!marker_path.exists(),
"replacement marker should be removed after apply"
);
}
fn make_numeric_chunk(series_id: u64, points: &[(i64, f64)]) -> Chunk {
let points = points
.iter()
.map(|(ts, value)| ChunkPoint {
ts: *ts,
value: Value::F64(*value),
})
.collect::<Vec<_>>();
let encoded = Encoder::encode_chunk_points(&points, ValueLane::Numeric).unwrap();
Chunk {
header: ChunkHeader {
series_id,
lane: ValueLane::Numeric,
value_family: Some(SeriesValueFamily::F64),
point_count: points.len() as u16,
min_ts: points.first().unwrap().ts,
max_ts: points.last().unwrap().ts,
ts_codec: encoded.ts_codec,
value_codec: encoded.value_codec,
},
points,
encoded_payload: encoded.payload,
wal_highwater: WalHighWatermark::default(),
}
}
fn make_histogram_chunk(series_id: u64, timestamps: &[i64]) -> Chunk {
let histogram = sample_histogram();
let points = timestamps
.iter()
.map(|ts| ChunkPoint {
ts: *ts,
value: Value::from(histogram.clone()),
})
.collect::<Vec<_>>();
let encoded = Encoder::encode_chunk_points(&points, ValueLane::Blob).unwrap();
Chunk {
header: ChunkHeader {
series_id,
lane: ValueLane::Blob,
value_family: Some(SeriesValueFamily::Histogram),
point_count: points.len() as u16,
min_ts: points.first().unwrap().ts,
max_ts: points.last().unwrap().ts,
ts_codec: encoded.ts_codec,
value_codec: encoded.value_codec,
},
points,
encoded_payload: encoded.payload,
wal_highwater: WalHighWatermark::default(),
}
}