use std::path::Path;
use super::compress::DictionaryRegistry;
use nodedb_types::timeseries::{SegmentKind, SeriesId, TimeRange};
use super::reader::{self, MetricAggregation, SegmentReadError};
use super::segment_index::SegmentIndex;
#[derive(Debug)]
pub enum MetricQueryResult {
Samples(Vec<(i64, f64)>),
Aggregation(MetricAggregation),
Downsampled(Vec<(i64, f64)>),
}
#[derive(Debug)]
pub struct LogQueryResult {
pub entries: Vec<nodedb_types::timeseries::LogEntry>,
}
#[derive(Debug, thiserror::Error)]
pub enum QueryError {
#[error("segment read error: {0}")]
SegmentRead(#[from] SegmentReadError),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("no data found for series {series_id}")]
NoData { series_id: SeriesId },
}
pub struct TimeseriesQueryEngine<'a> {
segment_index: &'a SegmentIndex,
l1_dir: &'a Path,
log_registry: &'a DictionaryRegistry,
}
impl<'a> TimeseriesQueryEngine<'a> {
pub fn new(
segment_index: &'a SegmentIndex,
l1_dir: &'a Path,
log_registry: &'a DictionaryRegistry,
) -> Self {
Self {
segment_index,
l1_dir,
log_registry,
}
}
pub fn scan_metrics(
&self,
series_id: SeriesId,
range: &TimeRange,
) -> Result<Vec<(i64, f64)>, QueryError> {
let segments = self.segment_index.query(series_id, range);
if segments.is_empty() {
return Ok(Vec::new());
}
let mut all_samples = Vec::new();
for seg in &segments {
if seg.kind != SegmentKind::Metric {
continue;
}
let path = self.l1_dir.join(&seg.path);
let data = reader::read_metric_segment(&path)?;
for (ts, val) in data.samples {
if range.contains(ts) {
all_samples.push((ts, val));
}
}
}
all_samples.sort_by_key(|&(ts, _)| ts);
Ok(all_samples)
}
pub fn aggregate_metrics(
&self,
series_id: SeriesId,
range: &TimeRange,
) -> Result<Option<MetricAggregation>, QueryError> {
let samples = self.scan_metrics(series_id, range)?;
Ok(MetricAggregation::compute(&samples))
}
pub fn downsample_metrics(
&self,
series_id: SeriesId,
range: &TimeRange,
window_ms: i64,
) -> Result<Vec<(i64, f64)>, QueryError> {
let samples = self.scan_metrics(series_id, range)?;
Ok(reader::downsample(&samples, window_ms))
}
pub fn scan_logs(
&self,
series_id: SeriesId,
range: &TimeRange,
) -> Result<Vec<nodedb_types::timeseries::LogEntry>, QueryError> {
let segments = self.segment_index.query(series_id, range);
if segments.is_empty() {
return Ok(Vec::new());
}
let mut all_entries = Vec::new();
for seg in &segments {
if seg.kind != SegmentKind::Log {
continue;
}
let path = self.l1_dir.join(&seg.path);
let data = reader::read_log_segment(&path, self.log_registry)?;
for entry in data.entries {
if range.contains(entry.timestamp_ms) {
all_entries.push(entry);
}
}
}
all_entries.sort_by_key(|e| e.timestamp_ms);
Ok(all_entries)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::timeseries::bucket::{BucketConfig, BucketManager};
use crate::engine::timeseries::gorilla::GorillaEncoder;
use crate::engine::timeseries::segment_index::SegmentIndex;
use nodedb_types::timeseries::{FlushedKind, FlushedSeries};
use tempfile::TempDir;
fn setup_test_data(dir: &TempDir) -> (SegmentIndex, DictionaryRegistry) {
let config = BucketConfig {
l1_dir: dir.path().join("l1"),
l2_dir: dir.path().join("l2"),
..Default::default()
};
let mut mgr = BucketManager::new(config);
let mut enc1 = GorillaEncoder::new();
for i in 0..50 {
enc1.encode(1000 + i * 100, 10.0 + i as f64 * 0.5);
}
let mut enc2 = GorillaEncoder::new();
for i in 0..50 {
enc2.encode(6000 + i * 100, 50.0 + i as f64 * 0.3);
}
let flushed = vec![
FlushedSeries {
series_id: 1,
kind: FlushedKind::Metric {
gorilla_block: enc1.finish(),
sample_count: 50,
},
min_ts: 1000,
max_ts: 5900,
},
FlushedSeries {
series_id: 1,
kind: FlushedKind::Metric {
gorilla_block: enc2.finish(),
sample_count: 50,
},
min_ts: 6000,
max_ts: 10900,
},
];
mgr.flush_to_l1(flushed, None).unwrap();
let idx = {
let mut idx = SegmentIndex::new();
for seg in mgr.segment_index().query(1, &TimeRange::new(0, i64::MAX)) {
idx.add(1, seg.clone());
}
idx
};
(idx, DictionaryRegistry::new())
}
#[test]
fn scan_metrics_full_range() {
let dir = TempDir::new().unwrap();
let (idx, registry) = setup_test_data(&dir);
let l1_dir = dir.path().join("l1");
let engine = TimeseriesQueryEngine::new(&idx, &l1_dir, ®istry);
let samples = engine.scan_metrics(1, &TimeRange::new(0, 20000)).unwrap();
assert_eq!(samples.len(), 100); for w in samples.windows(2) {
assert!(w[0].0 <= w[1].0);
}
}
#[test]
fn scan_metrics_partial_range() {
let dir = TempDir::new().unwrap();
let (idx, registry) = setup_test_data(&dir);
let l1_dir = dir.path().join("l1");
let engine = TimeseriesQueryEngine::new(&idx, &l1_dir, ®istry);
let samples = engine.scan_metrics(1, &TimeRange::new(2000, 4000)).unwrap();
assert!(!samples.is_empty());
for &(ts, _) in &samples {
assert!((2000..=4000).contains(&ts));
}
}
#[test]
fn aggregate_metrics_works() {
let dir = TempDir::new().unwrap();
let (idx, registry) = setup_test_data(&dir);
let l1_dir = dir.path().join("l1");
let engine = TimeseriesQueryEngine::new(&idx, &l1_dir, ®istry);
let agg = engine
.aggregate_metrics(1, &TimeRange::new(0, 20000))
.unwrap()
.unwrap();
assert_eq!(agg.count, 100);
assert!(agg.min < agg.max);
}
#[test]
fn downsample_reduces_points() {
let dir = TempDir::new().unwrap();
let (idx, registry) = setup_test_data(&dir);
let l1_dir = dir.path().join("l1");
let engine = TimeseriesQueryEngine::new(&idx, &l1_dir, ®istry);
let full = engine.scan_metrics(1, &TimeRange::new(0, 20000)).unwrap();
let downsampled = engine
.downsample_metrics(1, &TimeRange::new(0, 20000), 5000)
.unwrap();
assert!(downsampled.len() < full.len());
assert!(!downsampled.is_empty());
}
#[test]
fn empty_series_returns_empty() {
let dir = TempDir::new().unwrap();
let idx = SegmentIndex::new();
let registry = DictionaryRegistry::new();
let l1_dir = dir.path().join("l1");
let engine = TimeseriesQueryEngine::new(&idx, &l1_dir, ®istry);
let samples = engine
.scan_metrics(999, &TimeRange::new(0, 100000))
.unwrap();
assert!(samples.is_empty());
}
}