use crate::DbCore;
use crate::error::DbError;
use crate::query::{
compute_avg_over_time, compute_increase, compute_max_over_time, compute_min_over_time,
compute_rate, compute_sum_over_time,
};
use crate::query_surface::{
AggOp, EvalExpr, Grouping, InstantSelector, RangeFn, parse_eval_expr, parse_instant_selector,
series_matches_selector,
};
use http::StatusCode;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
pub const PROMETHEUS_API_CONTENT_TYPE: &str = "application/json";
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ApiEnvelope<T> {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<T>,
#[serde(skip_serializing_if = "Option::is_none", rename = "errorType")]
pub error_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub warnings: Option<Vec<String>>,
}
fn success<T: Serialize>(data: T) -> ApiEnvelope<T> {
ApiEnvelope {
status: "success".to_string(),
data: Some(data),
error_type: None,
error: None,
warnings: None,
}
}
fn error_response(typ: &'static str, msg: String) -> ApiEnvelope<serde_json::Value> {
ApiEnvelope {
status: "error".to_string(),
data: None,
error_type: Some(typ.to_string()),
error: Some(msg),
warnings: None,
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct QueryData {
#[serde(rename = "resultType")]
pub result_type: String,
pub result: QueryResult,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum QueryResult {
Vector(Vec<VectorSample>),
Matrix(Vec<MatrixSeries>),
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct VectorSample {
pub metric: HashMap<String, String>,
pub value: [serde_json::Value; 2],
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct MatrixSeries {
pub metric: HashMap<String, String>,
pub values: Vec<[serde_json::Value; 2]>,
}
fn format_sample_value(v: f64) -> String {
if v.is_nan() {
"NaN".to_string()
} else if v == f64::INFINITY {
"+Inf".to_string()
} else if v == f64::NEG_INFINITY {
"-Inf".to_string()
} else {
v.to_string()
}
}
fn ns_to_sec(ns: u64) -> f64 {
(ns as f64) / 1e9
}
fn now_ns() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
fn parse_time_param(s: Option<&str>) -> Result<u64, String> {
let s = match s {
Some(t) => t.trim(),
None => return Ok(now_ns()),
};
if s.is_empty() {
return Ok(now_ns());
}
if let Ok(secs) = s.parse::<f64>() {
if secs.is_finite() && secs >= 0.0 {
return Ok((secs * 1e9) as u64);
}
}
if let Ok(secs) = s.parse::<u64>() {
return Ok(secs * 1_000_000_000);
}
Err(format!(
"invalid time parameter (use Unix seconds): {:?}",
s
))
}
fn parse_step_seconds(s: Option<&str>) -> Result<u64, String> {
let s = match s {
Some(t) => t.trim(),
None => return Err("missing step parameter".to_string()),
};
if s.is_empty() {
return Err("empty step parameter".to_string());
}
let s = ascii_lower(s);
let (num_str, mult) = if s.ends_with("s") {
(&s[..s.len() - 1], 1u64)
} else if s.ends_with("m") {
(&s[..s.len() - 1], 60)
} else if s.ends_with("h") {
(&s[..s.len() - 1], 3600)
} else if s.ends_with("d") {
(&s[..s.len() - 1], 86400)
} else if let Ok(n) = s.parse::<f64>() {
return Ok(if n >= 0.0 && n.is_finite() {
n as u64
} else {
1
});
} else {
return Err(format!("invalid step: {:?}", s));
};
let n: f64 = num_str
.parse()
.map_err(|_| format!("invalid step number: {:?}", num_str))?;
if !n.is_finite() || n < 0.0 {
return Err("step must be non-negative".to_string());
}
Ok((n * mult as f64) as u64)
}
fn ascii_lower(s: &str) -> String {
s.chars().map(|c| c.to_ascii_lowercase()).collect()
}
fn metric_from_series_and_tags(
series: &str,
tags: &HashMap<String, String>,
) -> HashMap<String, String> {
let mut m = HashMap::new();
m.insert("__name__".to_string(), series.to_string());
for (k, v) in tags {
m.insert(k.clone(), v.clone());
}
m
}
fn json_sample_pair(ts_ns: u64, value: f64) -> [serde_json::Value; 2] {
[
serde_json::Value::Number(
serde_json::Number::from_f64(ns_to_sec(ts_ns)).unwrap_or(serde_json::Number::from(0)),
),
serde_json::Value::String(format_sample_value(value)),
]
}
#[derive(Debug)]
pub struct PrometheusApiResponse {
pub status: StatusCode,
pub body: Vec<u8>,
}
impl PrometheusApiResponse {
fn ok_json<T: Serialize>(data: T) -> Self {
let body = serde_json::to_vec(&success(data)).expect("serialize success");
Self {
status: StatusCode::OK,
body,
}
}
fn err_json(status: StatusCode, error_type: &'static str, error: String) -> Self {
let body = serde_json::to_vec(&error_response(error_type, error)).expect("serialize error");
Self { status, body }
}
}
struct EvalSample {
metric: HashMap<String, String>,
ts_ns: u64,
value: f64,
}
struct EvalSeriesData {
metric: HashMap<String, String>,
steps: Vec<(u64, f64)>,
}
fn eval_vector(
expr: &EvalExpr,
time_ns: u64,
db: &Arc<DbCore>,
) -> Result<Vec<EvalSample>, DbError> {
match expr {
EvalExpr::Instant(selector) => eval_instant_at(selector, time_ns, db),
EvalExpr::RangeFunction {
func,
selector,
range,
} => eval_range_fn_at(*func, selector, *range, time_ns, db),
EvalExpr::Aggregation {
op,
inner,
grouping,
} => {
let inner_samples = eval_vector(inner, time_ns, db)?;
Ok(aggregate_samples(*op, inner_samples, grouping))
}
}
}
fn eval_instant_at(
selector: &InstantSelector,
time_ns: u64,
db: &Arc<DbCore>,
) -> Result<Vec<EvalSample>, DbError> {
let eval_time = match selector.offset {
Some(off) => time_ns.saturating_sub(off.as_nanos() as u64),
None => time_ns,
};
let series_keys: Vec<_> = db
.list_series_keys()
.into_iter()
.filter(|(name, tags)| series_matches_selector(name, tags, &selector.selector))
.collect();
let run_one = |(series_name, tags): &(String, crate::types::TagSet)| -> Result<Option<EvalSample>, DbError> {
let start = eval_time.saturating_sub(1_000_000_000);
let end = eval_time.saturating_add(1);
let points = db.query(series_name, start..end, Some(tags))?;
match points.into_iter().max_by_key(|&(t, _)| t) {
Some((ts, val)) => Ok(Some(EvalSample {
metric: metric_from_series_and_tags(series_name, tags),
ts_ns: ts,
value: val,
})),
None => Ok(None),
}
};
let results: Vec<Result<Option<EvalSample>, DbError>> = match db.get_query_pool() {
Some(pool) => pool.install(|| series_keys.par_iter().map(run_one).collect()),
None => series_keys.par_iter().map(run_one).collect(),
};
let mut out = Vec::new();
for r in results {
match r {
Ok(Some(s)) => out.push(s),
Ok(None) | Err(DbError::SeriesNotFound(_)) => {}
Err(e) => return Err(e),
}
}
Ok(out)
}
fn eval_range_fn_at(
func: RangeFn,
selector: &InstantSelector,
range: std::time::Duration,
time_ns: u64,
db: &Arc<DbCore>,
) -> Result<Vec<EvalSample>, DbError> {
let range_ns = range.as_nanos() as u64;
let eval_time = match selector.offset {
Some(off) => time_ns.saturating_sub(off.as_nanos() as u64),
None => time_ns,
};
let window_start = eval_time.saturating_sub(range_ns);
let window_end = eval_time.saturating_add(1);
let series_keys: Vec<_> = db
.list_series_keys()
.into_iter()
.filter(|(name, tags)| series_matches_selector(name, tags, &selector.selector))
.collect();
let run_one = |(series_name, tags): &(String, crate::types::TagSet)| -> Result<Option<EvalSample>, DbError> {
let points = db.query(series_name, window_start..window_end, Some(tags))?;
let value = apply_range_fn(func, &points)?;
Ok(value.map(|v| EvalSample {
metric: metric_from_series_and_tags(series_name, tags),
ts_ns: time_ns,
value: v,
}))
};
let results: Vec<Result<Option<EvalSample>, DbError>> = match db.get_query_pool() {
Some(pool) => pool.install(|| series_keys.par_iter().map(run_one).collect()),
None => series_keys.par_iter().map(run_one).collect(),
};
let mut out = Vec::new();
for r in results {
match r {
Ok(Some(s)) => out.push(s),
Ok(None) | Err(DbError::SeriesNotFound(_)) => {}
Err(e) => return Err(e),
}
}
Ok(out)
}
fn apply_range_fn(
func: RangeFn,
points: &[(crate::types::Timestamp, crate::types::Value)],
) -> Result<Option<f64>, DbError> {
let result = match func {
RangeFn::Rate => compute_rate(points),
RangeFn::Increase => compute_increase(points),
RangeFn::AvgOverTime => compute_avg_over_time(points),
RangeFn::MaxOverTime => compute_max_over_time(points),
RangeFn::MinOverTime => compute_min_over_time(points),
RangeFn::SumOverTime => compute_sum_over_time(points),
};
Ok(result)
}
type AggGroupEntry = (HashMap<String, String>, Vec<f64>, u64);
fn aggregate_samples(op: AggOp, samples: Vec<EvalSample>, grouping: &Grouping) -> Vec<EvalSample> {
let mut groups: HashMap<String, AggGroupEntry> = HashMap::new();
for sample in samples {
let key = group_key_string(&sample.metric, grouping);
let entry = groups.entry(key).or_insert_with(|| {
(
group_metric(&sample.metric, grouping),
Vec::new(),
sample.ts_ns,
)
});
entry.1.push(sample.value);
if sample.ts_ns > entry.2 {
entry.2 = sample.ts_ns;
}
}
groups
.into_values()
.map(|(metric, values, ts_ns)| EvalSample {
metric,
ts_ns,
value: apply_agg_op(op, &values),
})
.collect()
}
fn apply_agg_op(op: AggOp, values: &[f64]) -> f64 {
if values.is_empty() {
return match op {
AggOp::Sum | AggOp::Count => 0.0,
_ => f64::NAN,
};
}
match op {
AggOp::Sum => values.iter().copied().sum(),
AggOp::Count => values.len() as f64,
AggOp::Avg => {
let sum: f64 = values.iter().copied().sum();
sum / (values.len() as f64)
}
AggOp::Min => values.iter().copied().reduce(f64::min).unwrap(),
AggOp::Max => values.iter().copied().reduce(f64::max).unwrap(),
}
}
fn group_key_string(metric: &HashMap<String, String>, grouping: &Grouping) -> String {
let mut pairs: Vec<(&String, &String)> = match grouping {
Grouping::By(labels) => metric.iter().filter(|(k, _)| labels.contains(k)).collect(),
Grouping::Without(labels) => metric.iter().filter(|(k, _)| !labels.contains(k)).collect(),
};
pairs.sort_by_key(|(k, _)| *k);
pairs
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",")
}
fn group_metric(metric: &HashMap<String, String>, grouping: &Grouping) -> HashMap<String, String> {
match grouping {
Grouping::By(labels) => metric
.iter()
.filter(|(k, _)| labels.contains(k))
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
Grouping::Without(labels) => metric
.iter()
.filter(|(k, _)| !labels.contains(k))
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
}
}
fn eval_matrix(
expr: &EvalExpr,
start_ns: u64,
end_ns: u64,
step_ns: u64,
db: &Arc<DbCore>,
) -> Result<Vec<EvalSeriesData>, DbError> {
match expr {
EvalExpr::Instant(selector) => eval_instant_matrix(selector, start_ns, end_ns, step_ns, db),
EvalExpr::RangeFunction {
func,
selector,
range,
} => eval_range_fn_matrix(*func, selector, *range, start_ns, end_ns, step_ns, db),
EvalExpr::Aggregation {
op,
inner,
grouping,
} => {
let inner_data = eval_matrix(inner, start_ns, end_ns, step_ns, db)?;
Ok(aggregate_matrix(
*op, inner_data, grouping, start_ns, end_ns, step_ns,
))
}
}
}
fn eval_instant_matrix(
selector: &InstantSelector,
start_ns: u64,
end_ns: u64,
step_ns: u64,
db: &Arc<DbCore>,
) -> Result<Vec<EvalSeriesData>, DbError> {
let off_ns = selector.offset.map(|d| d.as_nanos() as u64).unwrap_or(0);
let query_start = start_ns.saturating_sub(off_ns);
let query_end = end_ns.saturating_sub(off_ns).saturating_add(1);
let series_keys: Vec<_> = db
.list_series_keys()
.into_iter()
.filter(|(name, tags)| series_matches_selector(name, tags, &selector.selector))
.collect();
let run_range = |(series_name, tags): &(String, crate::types::TagSet)| -> Result<Option<EvalSeriesData>, DbError> {
let points = db.query(series_name, query_start..query_end, Some(tags))?;
let mut steps = Vec::new();
let mut t = start_ns;
while t <= end_ns {
let eval_ts = t.saturating_sub(off_ns);
if let Some((_, val)) = points
.iter()
.filter(|(pt, _)| *pt <= eval_ts)
.max_by_key(|(pt, _)| *pt)
{
steps.push((t, *val));
}
t = t.saturating_add(step_ns);
}
if steps.is_empty() {
return Ok(None);
}
Ok(Some(EvalSeriesData {
metric: metric_from_series_and_tags(series_name, tags),
steps,
}))
};
let results: Vec<Result<Option<EvalSeriesData>, DbError>> = match db.get_query_pool() {
Some(pool) => pool.install(|| series_keys.par_iter().map(run_range).collect()),
None => series_keys.par_iter().map(run_range).collect(),
};
let mut out = Vec::new();
for r in results {
match r {
Ok(Some(s)) => out.push(s),
Ok(None) | Err(DbError::SeriesNotFound(_)) => {}
Err(e) => return Err(e),
}
}
Ok(out)
}
fn eval_range_fn_matrix(
func: RangeFn,
selector: &InstantSelector,
range: std::time::Duration,
start_ns: u64,
end_ns: u64,
step_ns: u64,
db: &Arc<DbCore>,
) -> Result<Vec<EvalSeriesData>, DbError> {
let range_ns = range.as_nanos() as u64;
let off_ns = selector.offset.map(|d| d.as_nanos() as u64).unwrap_or(0);
let fetch_start = start_ns.saturating_sub(off_ns).saturating_sub(range_ns);
let fetch_end = end_ns.saturating_sub(off_ns).saturating_add(1);
let series_keys: Vec<_> = db
.list_series_keys()
.into_iter()
.filter(|(name, tags)| series_matches_selector(name, tags, &selector.selector))
.collect();
let run_range = |(series_name, tags): &(String, crate::types::TagSet)| -> Result<Option<EvalSeriesData>, DbError> {
let all_points = db.query(series_name, fetch_start..fetch_end, Some(tags))?;
let mut steps = Vec::new();
let mut t = start_ns;
while t <= end_ns {
let eval_ts = t.saturating_sub(off_ns);
let window_start = eval_ts.saturating_sub(range_ns);
let window_points: Vec<_> = all_points
.iter()
.filter(|(pt, _)| *pt >= window_start && *pt <= eval_ts)
.copied()
.collect();
if let Ok(Some(val)) = apply_range_fn(func, &window_points) {
steps.push((t, val));
}
t = t.saturating_add(step_ns);
}
if steps.is_empty() {
return Ok(None);
}
Ok(Some(EvalSeriesData {
metric: metric_from_series_and_tags(series_name, tags),
steps,
}))
};
let results: Vec<Result<Option<EvalSeriesData>, DbError>> = match db.get_query_pool() {
Some(pool) => pool.install(|| series_keys.par_iter().map(run_range).collect()),
None => series_keys.par_iter().map(run_range).collect(),
};
let mut out = Vec::new();
for r in results {
match r {
Ok(Some(s)) => out.push(s),
Ok(None) | Err(DbError::SeriesNotFound(_)) => {}
Err(e) => return Err(e),
}
}
Ok(out)
}
fn aggregate_matrix(
op: AggOp,
inner: Vec<EvalSeriesData>,
grouping: &Grouping,
start_ns: u64,
end_ns: u64,
step_ns: u64,
) -> Vec<EvalSeriesData> {
let mut groups: HashMap<String, (HashMap<String, String>, Vec<&EvalSeriesData>)> =
HashMap::new();
for series in &inner {
let key = group_key_string(&series.metric, grouping);
let entry = groups
.entry(key)
.or_insert_with(|| (group_metric(&series.metric, grouping), Vec::new()));
entry.1.push(series);
}
let mut result = Vec::new();
for (_, (group_metric, group_series)) in groups {
let mut steps = Vec::new();
let mut t = start_ns;
while t <= end_ns {
let values: Vec<f64> = group_series
.iter()
.filter_map(|s| s.steps.iter().find(|(ts, _)| *ts == t).map(|(_, v)| *v))
.collect();
if !values.is_empty() {
steps.push((t, apply_agg_op(op, &values)));
}
t = t.saturating_add(step_ns);
}
if !steps.is_empty() {
result.push(EvalSeriesData {
metric: group_metric,
steps,
});
}
}
result
}
pub fn handle_query(
query_param: Option<&str>,
time_param: Option<&str>,
db: &Arc<DbCore>,
) -> PrometheusApiResponse {
let query = match query_param {
Some(q) if !q.trim().is_empty() => q.trim(),
_ => {
return PrometheusApiResponse::err_json(
StatusCode::BAD_REQUEST,
"bad_data",
"missing query parameter".to_string(),
);
}
};
let expr = match parse_eval_expr(query) {
Ok(e) => e,
Err(e) => {
return PrometheusApiResponse::err_json(
StatusCode::UNPROCESSABLE_ENTITY,
"bad_data",
e,
);
}
};
let time_ns = match parse_time_param(time_param) {
Ok(t) => t,
Err(e) => {
return PrometheusApiResponse::err_json(StatusCode::BAD_REQUEST, "bad_data", e);
}
};
let samples = match eval_vector(&expr, time_ns, db) {
Ok(s) => s,
Err(e) => {
return PrometheusApiResponse::err_json(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
e.to_string(),
);
}
};
let vector: Vec<VectorSample> = samples
.into_iter()
.map(|s| VectorSample {
metric: s.metric,
value: json_sample_pair(s.ts_ns, s.value),
})
.collect();
PrometheusApiResponse::ok_json(QueryData {
result_type: "vector".to_string(),
result: QueryResult::Vector(vector),
})
}
pub fn handle_query_range(
query_param: Option<&str>,
start_param: Option<&str>,
end_param: Option<&str>,
step_param: Option<&str>,
db: &Arc<DbCore>,
) -> PrometheusApiResponse {
let query = match query_param {
Some(q) if !q.trim().is_empty() => q.trim(),
_ => {
return PrometheusApiResponse::err_json(
StatusCode::BAD_REQUEST,
"bad_data",
"missing query parameter".to_string(),
);
}
};
let start_ns = match start_param {
Some(s) => match parse_time_param(Some(s)) {
Ok(t) => t,
Err(e) => {
return PrometheusApiResponse::err_json(StatusCode::BAD_REQUEST, "bad_data", e);
}
},
None => {
return PrometheusApiResponse::err_json(
StatusCode::BAD_REQUEST,
"bad_data",
"missing start parameter".to_string(),
);
}
};
let end_ns = match end_param {
Some(s) => match parse_time_param(Some(s)) {
Ok(t) => t,
Err(e) => {
return PrometheusApiResponse::err_json(StatusCode::BAD_REQUEST, "bad_data", e);
}
},
None => {
return PrometheusApiResponse::err_json(
StatusCode::BAD_REQUEST,
"bad_data",
"missing end parameter".to_string(),
);
}
};
if start_ns >= end_ns {
return PrometheusApiResponse::err_json(
StatusCode::BAD_REQUEST,
"bad_data",
"start must be before end".to_string(),
);
}
let step_secs = match parse_step_seconds(step_param) {
Ok(s) => s,
Err(e) => {
return PrometheusApiResponse::err_json(StatusCode::BAD_REQUEST, "bad_data", e);
}
};
let step_ns = step_secs * 1_000_000_000;
let expr = match parse_eval_expr(query) {
Ok(e) => e,
Err(e) => {
return PrometheusApiResponse::err_json(
StatusCode::UNPROCESSABLE_ENTITY,
"bad_data",
e,
);
}
};
let eval_data = match eval_matrix(&expr, start_ns, end_ns, step_ns, db) {
Ok(d) => d,
Err(e) => {
return PrometheusApiResponse::err_json(
StatusCode::INTERNAL_SERVER_ERROR,
"internal",
e.to_string(),
);
}
};
let matrix: Vec<MatrixSeries> = eval_data
.into_iter()
.map(|sd| MatrixSeries {
metric: sd.metric,
values: sd
.steps
.into_iter()
.map(|(ts, val)| json_sample_pair(ts, val))
.collect(),
})
.collect();
PrometheusApiResponse::ok_json(QueryData {
result_type: "matrix".to_string(),
result: QueryResult::Matrix(matrix),
})
}
pub fn handle_labels(db: &Arc<DbCore>) -> PrometheusApiResponse {
let mut names: std::collections::HashSet<String> = std::collections::HashSet::new();
names.insert("__name__".to_string());
for (_, tags) in db.list_series_keys() {
for k in tags.keys() {
names.insert(k.clone());
}
}
let mut data: Vec<String> = names.into_iter().collect();
data.sort();
PrometheusApiResponse::ok_json(data)
}
pub fn handle_label_values(label_name: &str, db: &Arc<DbCore>) -> PrometheusApiResponse {
let mut values: std::collections::HashSet<String> = std::collections::HashSet::new();
if label_name == "__name__" {
for name in db.list_series_names() {
values.insert(name);
}
} else {
for (_, tags) in db.list_series_keys() {
if let Some(v) = tags.get(label_name) {
values.insert(v.clone());
}
}
}
let mut data: Vec<String> = values.into_iter().collect();
data.sort();
PrometheusApiResponse::ok_json(data)
}
pub fn handle_series(
match_params: &[String],
_start_param: Option<&str>,
_end_param: Option<&str>,
db: &Arc<DbCore>,
) -> PrometheusApiResponse {
if match_params.is_empty() {
return PrometheusApiResponse::err_json(
StatusCode::BAD_REQUEST,
"bad_data",
"at least one match[] parameter is required".to_string(),
);
}
let mut series_list: Vec<HashMap<String, String>> = Vec::new();
for m in match_params {
let m = m.trim();
if m.is_empty() {
continue;
}
let instant = match parse_instant_selector(m) {
Ok(s) => s,
Err(_) => continue,
};
for (series_name, tags) in db.list_series_keys() {
if series_matches_selector(&series_name, &tags, &instant.selector) {
let labels = metric_from_series_and_tags(&series_name, &tags);
if !series_list.iter().any(|l| l == &labels) {
series_list.push(labels);
}
}
}
}
PrometheusApiResponse::ok_json(series_list)
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn make_db_with_series() -> (Arc<DbCore>, tempfile::TempDir) {
make_db_with_series_and_parallelism(None)
}
fn make_db_with_series_and_parallelism(
query_max_parallel_series: Option<usize>,
) -> (Arc<DbCore>, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let config = crate::DbConfig {
data_dir: dir.path().to_path_buf(),
max_series_cardinality: Some(1000),
query_max_parallel_series,
..Default::default()
};
let mut db = DbCore::with_config(config).unwrap();
db.recover().unwrap();
let db = Arc::new(db);
db.insert(
"http_requests_total",
1_000_000_000,
10.0,
[("job".to_string(), "api".to_string())]
.into_iter()
.collect(),
)
.unwrap();
db.insert(
"http_requests_total",
2_000_000_000,
20.0,
[("job".to_string(), "api".to_string())]
.into_iter()
.collect(),
)
.unwrap();
db.insert(
"http_requests_total",
1_500_000_000,
15.0,
[("job".to_string(), "web".to_string())]
.into_iter()
.collect(),
)
.unwrap();
db.flush().unwrap();
(db, dir)
}
fn make_db_for_aggregation() -> (Arc<DbCore>, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let config = crate::DbConfig {
data_dir: dir.path().to_path_buf(),
max_series_cardinality: Some(1000),
..Default::default()
};
let mut db = DbCore::with_config(config).unwrap();
db.recover().unwrap();
let db = Arc::new(db);
for i in 0..5 {
db.insert(
"http_requests_total",
(i + 1) * 1_000_000_000,
(i as f64 + 1.0) * 10.0,
[
("job".to_string(), "api".to_string()),
("instance".to_string(), "a".to_string()),
]
.into_iter()
.collect(),
)
.unwrap();
}
for i in 0..5 {
db.insert(
"http_requests_total",
(i + 1) * 1_000_000_000,
(i as f64 + 1.0) * 5.0,
[
("job".to_string(), "api".to_string()),
("instance".to_string(), "b".to_string()),
]
.into_iter()
.collect(),
)
.unwrap();
}
for i in 0..5 {
db.insert(
"http_requests_total",
(i + 1) * 1_000_000_000,
(i as f64 + 1.0) * 3.0,
[
("job".to_string(), "web".to_string()),
("instance".to_string(), "c".to_string()),
]
.into_iter()
.collect(),
)
.unwrap();
}
db.flush().unwrap();
(db, dir)
}
fn metric_sort_key(m: &HashMap<String, String>) -> String {
let mut pairs: Vec<_> = m.iter().collect();
pairs.sort_by_key(|(k, _)| *k);
pairs
.into_iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",")
}
fn sort_vector_samples(samples: &mut [VectorSample]) {
samples.sort_by_key(|a| metric_sort_key(&a.metric));
}
fn sort_matrix_series(series: &mut [MatrixSeries]) {
series.sort_by_key(|a| metric_sort_key(&a.metric));
}
#[test]
fn query_missing_param_returns_400() {
let (db, _guard) = make_db_with_series();
let r = handle_query(None, None, &db);
assert_eq!(r.status, StatusCode::BAD_REQUEST);
let s = String::from_utf8(r.body).unwrap();
let body: ApiEnvelope<serde_json::Value> = serde_json::from_str(&s).unwrap();
assert_eq!(body.status.as_str(), "error");
assert!(body.error.unwrap().contains("query"));
}
#[test]
fn query_valid_selector_returns_vector() {
let (db, _guard) = make_db_with_series();
let r = handle_query(Some("http_requests_total"), Some("2"), &db);
assert_eq!(r.status, StatusCode::OK);
let s = String::from_utf8(r.body).unwrap();
let body: ApiEnvelope<QueryData> = serde_json::from_str(&s).unwrap();
assert_eq!(body.status.as_str(), "success");
let data = body.data.unwrap();
assert_eq!(data.result_type.as_str(), "vector");
let QueryResult::Vector(samples) = data.result else {
panic!("expected vector")
};
assert!(!samples.is_empty());
let has_api = samples
.iter()
.any(|s| s.metric.get("job") == Some(&"api".to_string()));
let has_web = samples
.iter()
.any(|s| s.metric.get("job") == Some(&"web".to_string()));
assert!(has_api || has_web);
}
#[test]
fn labels_returns_name_and_known_labels() {
let (db, _guard) = make_db_with_series();
let r = handle_labels(&db);
assert_eq!(r.status, StatusCode::OK);
let s = String::from_utf8(r.body).unwrap();
let body: ApiEnvelope<Vec<String>> = serde_json::from_str(&s).unwrap();
assert_eq!(body.status.as_str(), "success");
let data = body.data.unwrap();
assert!(data.contains(&"__name__".to_string()));
assert!(data.contains(&"job".to_string()));
}
#[test]
fn label_values_name_returns_metric_names() {
let (db, _guard) = make_db_with_series();
let r = handle_label_values("__name__", &db);
assert_eq!(r.status, StatusCode::OK);
let s = String::from_utf8(r.body).unwrap();
let body: ApiEnvelope<Vec<String>> = serde_json::from_str(&s).unwrap();
assert_eq!(body.status.as_str(), "success");
assert!(
body.data
.unwrap()
.contains(&"http_requests_total".to_string())
);
}
#[test]
fn series_requires_match_returns_400_without() {
let (db, _guard) = make_db_with_series();
let r = handle_series(&[], None, None, &db);
assert_eq!(r.status, StatusCode::BAD_REQUEST);
}
#[test]
fn series_with_match_returns_label_sets() {
let (db, _guard) = make_db_with_series();
let r = handle_series(&["http_requests_total".to_string()], None, None, &db);
assert_eq!(r.status, StatusCode::OK);
let s = String::from_utf8(r.body).unwrap();
let body: ApiEnvelope<Vec<HashMap<String, String>>> = serde_json::from_str(&s).unwrap();
assert_eq!(body.status.as_str(), "success");
let data = body.data.unwrap();
assert!(!data.is_empty());
assert!(
data.iter()
.any(|m| m.get("__name__") == Some(&"http_requests_total".to_string()))
);
}
#[test]
fn query_range_valid_returns_matrix() {
let (db, _guard) = make_db_with_series();
let r = handle_query_range(
Some("http_requests_total"),
Some("1"),
Some("3"),
Some("1s"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let s = String::from_utf8(r.body).unwrap();
let body: ApiEnvelope<QueryData> = serde_json::from_str(&s).unwrap();
assert_eq!(body.status.as_str(), "success");
let data = body.data.unwrap();
assert_eq!(data.result_type.as_str(), "matrix");
let QueryResult::Matrix(series) = data.result else {
panic!("expected matrix")
};
assert!(!series.is_empty());
}
#[test]
fn query_surface_rejects_aggregation() {
let e = parse_instant_selector("sum(rate(x[5m]))").unwrap_err();
assert!(
e.contains("aggregation") || e.contains("function") || e.contains("not yet supported")
);
}
#[test]
fn query_vector_selector_regex_matcher() {
let (db, _guard) = make_db_with_series();
let r = handle_query(
Some(r#"http_requests_total{job=~"api|web"}"#),
Some("2"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let s = String::from_utf8(r.body).unwrap();
let body: ApiEnvelope<QueryData> = serde_json::from_str(&s).unwrap();
assert_eq!(body.status.as_str(), "success");
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("expected vector")
};
assert_eq!(samples.len(), 2, "api and web both match =~");
}
#[test]
fn query_vector_selector_not_equal_matcher() {
let (db, _guard) = make_db_with_series();
let r = handle_query(Some(r#"http_requests_total{job!="web"}"#), Some("2"), &db);
assert_eq!(r.status, StatusCode::OK);
let s = String::from_utf8(r.body).unwrap();
let body: ApiEnvelope<QueryData> = serde_json::from_str(&s).unwrap();
assert_eq!(body.status.as_str(), "success");
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("expected vector")
};
assert_eq!(samples.len(), 1);
assert_eq!(samples[0].metric.get("job"), Some(&"api".to_string()));
}
#[test]
fn query_parallelism_1_same_result_as_default() {
let (db_default, _g0) = make_db_with_series_and_parallelism(None);
let (db_pool1, _g1) = make_db_with_series_and_parallelism(Some(1));
let r0 = handle_query(Some("http_requests_total"), Some("2"), &db_default);
let r1 = handle_query(Some("http_requests_total"), Some("2"), &db_pool1);
assert_eq!(r0.status, StatusCode::OK);
assert_eq!(r1.status, StatusCode::OK);
let body0: ApiEnvelope<QueryData> = serde_json::from_slice(&r0.body).unwrap();
let body1: ApiEnvelope<QueryData> = serde_json::from_slice(&r1.body).unwrap();
let QueryResult::Vector(mut v0) = body0.data.unwrap().result else {
panic!("vector")
};
let QueryResult::Vector(mut v1) = body1.data.unwrap().result else {
panic!("vector")
};
sort_vector_samples(&mut v0);
sort_vector_samples(&mut v1);
assert_eq!(v0.len(), v1.len(), "same number of series");
assert_eq!(v0, v1, "vector result identical with pool(1) vs default");
}
#[test]
fn query_parallelism_2_same_result_as_default() {
let (db_default, _g0) = make_db_with_series_and_parallelism(None);
let (db_pool2, _g1) = make_db_with_series_and_parallelism(Some(2));
let r0 = handle_query(
Some(r#"http_requests_total{job=~"api|web"}"#),
Some("2"),
&db_default,
);
let r1 = handle_query(
Some(r#"http_requests_total{job=~"api|web"}"#),
Some("2"),
&db_pool2,
);
assert_eq!(r0.status, StatusCode::OK);
assert_eq!(r1.status, StatusCode::OK);
let body0: ApiEnvelope<QueryData> = serde_json::from_slice(&r0.body).unwrap();
let body1: ApiEnvelope<QueryData> = serde_json::from_slice(&r1.body).unwrap();
let QueryResult::Vector(mut v0) = body0.data.unwrap().result else {
panic!("vector")
};
let QueryResult::Vector(mut v1) = body1.data.unwrap().result else {
panic!("vector")
};
sort_vector_samples(&mut v0);
sort_vector_samples(&mut v1);
assert_eq!(v0, v1, "vector result identical with pool(2) vs default");
}
#[test]
fn query_range_parallelism_1_same_result_as_default() {
let (db_default, _g0) = make_db_with_series_and_parallelism(None);
let (db_pool1, _g1) = make_db_with_series_and_parallelism(Some(1));
let r0 = handle_query_range(
Some("http_requests_total"),
Some("1"),
Some("3"),
Some("1s"),
&db_default,
);
let r1 = handle_query_range(
Some("http_requests_total"),
Some("1"),
Some("3"),
Some("1s"),
&db_pool1,
);
assert_eq!(r0.status, StatusCode::OK);
assert_eq!(r1.status, StatusCode::OK);
let body0: ApiEnvelope<QueryData> = serde_json::from_slice(&r0.body).unwrap();
let body1: ApiEnvelope<QueryData> = serde_json::from_slice(&r1.body).unwrap();
let QueryResult::Matrix(mut m0) = body0.data.unwrap().result else {
panic!("matrix")
};
let QueryResult::Matrix(mut m1) = body1.data.unwrap().result else {
panic!("matrix")
};
sort_matrix_series(&mut m0);
sort_matrix_series(&mut m1);
assert_eq!(m0.len(), m1.len());
assert_eq!(m0, m1, "matrix result identical with pool(1) vs default");
}
#[test]
fn query_parallelism_empty_selector_returns_empty_vector() {
let (db, _guard) = make_db_with_series_and_parallelism(Some(1));
let r = handle_query(Some(r#"nonexistent_metric{job="none"}"#), Some("2"), &db);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("vector")
};
assert!(samples.is_empty(), "no series match → empty vector");
}
#[test]
fn query_rate_returns_vector() {
let (db, _guard) = make_db_with_series();
let r = handle_query(Some("rate(http_requests_total[5m])"), Some("2"), &db);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
assert_eq!(body.status.as_str(), "success");
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("expected vector")
};
assert!(
!samples.is_empty(),
"rate should produce results for matching series"
);
for s in &samples {
let val: f64 = s.value[1].as_str().unwrap().parse().unwrap();
assert!(
val >= 0.0,
"rate should be non-negative for monotonic counter"
);
}
}
#[test]
fn query_range_rate_returns_matrix() {
let (db, _guard) = make_db_with_series();
let r = handle_query_range(
Some("rate(http_requests_total[2s])"),
Some("1"),
Some("3"),
Some("1s"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
assert_eq!(body.status.as_str(), "success");
let data = body.data.unwrap();
assert_eq!(data.result_type, "matrix");
let QueryResult::Matrix(series) = data.result else {
panic!("expected matrix")
};
assert!(!series.is_empty(), "rate range query should produce matrix");
}
#[test]
fn query_increase_returns_vector() {
let (db, _guard) = make_db_with_series();
let r = handle_query(Some("increase(http_requests_total[5m])"), Some("2"), &db);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("expected vector")
};
assert!(!samples.is_empty());
}
#[test]
fn query_avg_over_time_returns_vector() {
let (db, _guard) = make_db_with_series();
let r = handle_query(
Some("avg_over_time(http_requests_total[5m])"),
Some("2"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("expected vector")
};
assert!(!samples.is_empty());
}
#[test]
fn query_max_over_time_returns_vector() {
let (db, _guard) = make_db_with_series();
let r = handle_query(
Some("max_over_time(http_requests_total[5m])"),
Some("2"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("expected vector")
};
assert!(!samples.is_empty());
}
#[test]
fn query_sum_by_returns_grouped_vector() {
let (db, _guard) = make_db_for_aggregation();
let r = handle_query(
Some(r#"sum by (job) (http_requests_total)"#),
Some("5"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(mut samples) = body.data.unwrap().result else {
panic!("expected vector")
};
sort_vector_samples(&mut samples);
assert_eq!(samples.len(), 2, "should have 2 groups: api and web");
let api = samples
.iter()
.find(|s| s.metric.get("job") == Some(&"api".to_string()))
.unwrap();
let web = samples
.iter()
.find(|s| s.metric.get("job") == Some(&"web".to_string()))
.unwrap();
let api_val: f64 = api.value[1].as_str().unwrap().parse().unwrap();
let web_val: f64 = web.value[1].as_str().unwrap().parse().unwrap();
assert!(
(api_val - 75.0).abs() < 1e-9,
"api sum should be 75, got {}",
api_val
);
assert!(
(web_val - 15.0).abs() < 1e-9,
"web sum should be 15, got {}",
web_val
);
assert!(
!api.metric.contains_key("__name__"),
"sum by (job) should not preserve __name__"
);
}
#[test]
fn query_avg_without_returns_grouped_vector() {
let (db, _guard) = make_db_for_aggregation();
let r = handle_query(
Some(r#"avg without (instance) (http_requests_total)"#),
Some("5"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(mut samples) = body.data.unwrap().result else {
panic!("expected vector")
};
sort_vector_samples(&mut samples);
assert_eq!(samples.len(), 2, "should have 2 groups: api and web");
let api = samples
.iter()
.find(|s| s.metric.get("job") == Some(&"api".to_string()))
.unwrap();
let api_val: f64 = api.value[1].as_str().unwrap().parse().unwrap();
assert!(
(api_val - 37.5).abs() < 1e-9,
"api avg should be 37.5, got {}",
api_val
);
}
#[test]
fn query_count_returns_series_count() {
let (db, _guard) = make_db_for_aggregation();
let r = handle_query(Some(r#"count(http_requests_total)"#), Some("5"), &db);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("expected vector")
};
assert_eq!(samples.len(), 1, "count with no grouping → single result");
let count: f64 = samples[0].value[1].as_str().unwrap().parse().unwrap();
assert_eq!(count, 3.0, "should count 3 series");
}
#[test]
fn query_sum_by_rate_composite() {
let (db, _guard) = make_db_for_aggregation();
let r = handle_query(
Some(r#"sum by (job) (rate(http_requests_total[5m]))"#),
Some("5"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(mut samples) = body.data.unwrap().result else {
panic!("expected vector")
};
sort_vector_samples(&mut samples);
assert_eq!(samples.len(), 2, "should have 2 groups: api and web");
for s in &samples {
let val: f64 = s.value[1].as_str().unwrap().parse().unwrap();
assert!(val > 0.0, "sum of rates should be positive");
}
}
#[test]
fn query_range_sum_by_rate_returns_matrix() {
let (db, _guard) = make_db_for_aggregation();
let r = handle_query_range(
Some(r#"sum by (job) (rate(http_requests_total[3s]))"#),
Some("3"),
Some("5"),
Some("1s"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let data = body.data.unwrap();
assert_eq!(data.result_type, "matrix");
let QueryResult::Matrix(mut series) = data.result else {
panic!("expected matrix")
};
sort_matrix_series(&mut series);
assert_eq!(series.len(), 2, "should have 2 groups: api and web");
for s in &series {
assert!(!s.values.is_empty(), "each group should have step values");
}
}
#[test]
fn query_range_avg_over_time_downsamples() {
let (db, _guard) = make_db_for_aggregation();
let r = handle_query_range(
Some("avg_over_time(http_requests_total[3s])"),
Some("2"),
Some("5"),
Some("1s"),
&db,
);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let data = body.data.unwrap();
assert_eq!(data.result_type, "matrix");
let QueryResult::Matrix(series) = data.result else {
panic!("expected matrix")
};
assert!(
!series.is_empty(),
"avg_over_time range query should produce results"
);
for s in &series {
assert!(
s.values.len() >= 2,
"should have multiple step values (downsampled)"
);
}
}
#[test]
fn query_rate_no_data_returns_empty_vector() {
let (db, _guard) = make_db_with_series();
let r = handle_query(Some(r#"rate(nonexistent[5m])"#), Some("2"), &db);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("expected vector")
};
assert!(samples.is_empty());
}
#[test]
fn query_unsupported_function_returns_422() {
let (db, _guard) = make_db_with_series();
let r = handle_query(Some("histogram_quantile(0.9, rate(x[5m]))"), Some("2"), &db);
assert_eq!(r.status, StatusCode::UNPROCESSABLE_ENTITY);
}
#[test]
fn query_binary_expr_returns_422() {
let (db, _guard) = make_db_with_series();
let r = handle_query(Some("metric_a + metric_b"), Some("2"), &db);
assert_eq!(r.status, StatusCode::UNPROCESSABLE_ENTITY);
}
#[test]
fn query_sum_of_empty_set_returns_empty() {
let (db, _guard) = make_db_with_series();
let r = handle_query(Some(r#"sum by (job) (nonexistent)"#), Some("2"), &db);
assert_eq!(r.status, StatusCode::OK);
let body: ApiEnvelope<QueryData> = serde_json::from_slice(&r.body).unwrap();
let QueryResult::Vector(samples) = body.data.unwrap().result else {
panic!("expected vector")
};
assert!(samples.is_empty(), "sum of empty set should be empty");
}
}