use crate::error::DbError;
use crate::types::{TagSet, TimeSeriesChunk, Timestamp, Value};
use rayon::prelude::*;
use std::ops::Range;
use std::sync::RwLockReadGuard;
pub fn execute_query(
chunk_guard: RwLockReadGuard<TimeSeriesChunk>,
time_range: Range<Timestamp>,
tag_filter: Option<&TagSet>,
) -> Result<Vec<(Timestamp, Value)>, DbError> {
if time_range.start >= time_range.end {
return Err(DbError::InvalidTimeRange {
start: time_range.start,
end: time_range.end,
});
}
let chunk = &*chunk_guard;
let start_idx = chunk
.timestamps
.partition_point(|&ts| ts < time_range.start);
let end_idx = chunk.timestamps.partition_point(|&ts| ts < time_range.end);
if start_idx >= end_idx {
return Ok(Vec::new());
}
let results: Vec<(Timestamp, Value)> = (start_idx..end_idx)
.into_par_iter() .filter_map(|i| {
let tags_match = tag_filter.map_or(true, |filter| check_tags(&chunk.tags[i], filter));
if tags_match {
Some((chunk.timestamps[i], chunk.values[i]))
} else {
None
}
})
.collect();
Ok(results)
}
#[inline]
fn check_tags(point_tags: &TagSet, filter_tags: &TagSet) -> bool {
if point_tags.len() < filter_tags.len() {
return false;
}
filter_tags
.iter()
.all(|(key, value)| point_tags.get(key) == Some(value))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{DataPoint, TimeSeriesChunk};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
fn get_current_timestamp() -> Timestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
}
fn create_tags(pairs: &[(&str, &str)]) -> TagSet {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
fn create_test_chunk(points: Vec<DataPoint>) -> TimeSeriesChunk {
let mut chunk = TimeSeriesChunk::default();
for point in points {
chunk.timestamps.push(point.timestamp);
chunk.values.push(point.value);
chunk.tags.push(point.tags);
}
chunk
}
#[test]
fn test_execute_query_time_range() {
let ts1 = get_current_timestamp();
thread::sleep(std::time::Duration::from_nanos(1));
let ts2 = get_current_timestamp();
thread::sleep(std::time::Duration::from_nanos(1));
let ts3 = get_current_timestamp();
thread::sleep(std::time::Duration::from_nanos(1));
let ts4 = get_current_timestamp();
thread::sleep(std::time::Duration::from_nanos(1));
let ts5 = get_current_timestamp();
assert!(ts1 < ts2 && ts2 < ts3 && ts3 < ts4 && ts4 < ts5);
let points = vec![
DataPoint {
timestamp: ts1,
value: 1.0,
tags: TagSet::new(),
},
DataPoint {
timestamp: ts2,
value: 2.0,
tags: TagSet::new(),
},
DataPoint {
timestamp: ts3,
value: 3.0,
tags: TagSet::new(),
},
DataPoint {
timestamp: ts4,
value: 4.0,
tags: TagSet::new(),
},
DataPoint {
timestamp: ts5,
value: 5.0,
tags: TagSet::new(),
},
];
let chunk = create_test_chunk(points);
let chunk_arc = Arc::new(RwLock::new(chunk));
{
let chunk_guard = chunk_arc.read().unwrap();
let all_results = execute_query(chunk_guard, ts1..(ts5 + 1), None).unwrap();
assert_eq!(all_results.len(), 5);
assert_eq!(
all_results,
vec![(ts1, 1.0), (ts2, 2.0), (ts3, 3.0), (ts4, 4.0), (ts5, 5.0),]
);
}
{
let chunk_guard = chunk_arc.read().unwrap();
let partial_results = execute_query(chunk_guard, ts2..(ts4 + 1), None).unwrap();
assert_eq!(partial_results.len(), 3);
assert_eq!(partial_results, vec![(ts2, 2.0), (ts3, 3.0), (ts4, 4.0),]);
}
{
let chunk_guard = chunk_arc.read().unwrap();
let no_results = execute_query(chunk_guard, (ts5 + 1)..(ts5 + 100), None).unwrap();
assert_eq!(no_results.len(), 0);
}
{
let chunk_guard = chunk_arc.read().unwrap();
let invalid_range_result = execute_query(chunk_guard, ts3..ts2, None);
assert!(invalid_range_result.is_err());
if let Err(DbError::InvalidTimeRange { start, end }) = invalid_range_result {
assert_eq!(start, ts3);
assert_eq!(end, ts2);
} else {
panic!("Expected InvalidTimeRange error");
}
}
{
let chunk_guard = chunk_arc.read().unwrap();
let equal_range_result = execute_query(chunk_guard, ts3..ts3, None);
assert!(equal_range_result.is_err());
}
}
#[test]
fn test_execute_query_with_tags() {
let tags_host1 = create_tags(&[("host", "server1"), ("region", "us-east")]);
let tags_host2 = create_tags(&[("host", "server2"), ("region", "us-east")]);
let tags_host3 = create_tags(&[("host", "server3"), ("region", "us-west")]);
let ts1 = get_current_timestamp();
thread::sleep(std::time::Duration::from_nanos(1));
let ts2 = get_current_timestamp();
thread::sleep(std::time::Duration::from_nanos(1));
let ts3 = get_current_timestamp();
let points = vec![
DataPoint {
timestamp: ts1,
value: 1.0,
tags: tags_host1.clone(),
},
DataPoint {
timestamp: ts2,
value: 2.0,
tags: tags_host2.clone(),
},
DataPoint {
timestamp: ts3,
value: 3.0,
tags: tags_host3.clone(),
},
];
let chunk = create_test_chunk(points);
let chunk_arc = Arc::new(RwLock::new(chunk));
{
let chunk_guard = chunk_arc.read().unwrap();
let host1_filter = create_tags(&[("host", "server1")]);
let host1_results =
execute_query(chunk_guard, ts1..(ts3 + 1), Some(&host1_filter)).unwrap();
assert_eq!(host1_results.len(), 1);
assert_eq!(host1_results[0], (ts1, 1.0));
}
{
let chunk_guard = chunk_arc.read().unwrap();
let region_east_filter = create_tags(&[("region", "us-east")]);
let region_results =
execute_query(chunk_guard, ts1..(ts3 + 1), Some(®ion_east_filter)).unwrap();
assert_eq!(region_results.len(), 2);
assert_eq!(region_results, vec![(ts1, 1.0), (ts2, 2.0)]);
}
{
let chunk_guard = chunk_arc.read().unwrap();
let multi_filter = create_tags(&[("host", "server2"), ("region", "us-east")]);
let multi_results =
execute_query(chunk_guard, ts1..(ts3 + 1), Some(&multi_filter)).unwrap();
assert_eq!(multi_results.len(), 1);
assert_eq!(multi_results[0], (ts2, 2.0));
}
{
let chunk_guard = chunk_arc.read().unwrap();
let no_match_filter = create_tags(&[("host", "nonexistent")]);
let no_match_results =
execute_query(chunk_guard, ts1..(ts3 + 1), Some(&no_match_filter)).unwrap();
assert_eq!(no_match_results.len(), 0);
}
}
#[test]
fn test_check_tags() {
let point_tags = create_tags(&[("host", "server1"), ("region", "us-east")]);
let filter_exact = create_tags(&[("host", "server1"), ("region", "us-east")]);
assert!(check_tags(&point_tags, &filter_exact));
let filter_subset = create_tags(&[("host", "server1")]);
assert!(check_tags(&point_tags, &filter_subset));
let filter_diff_value = create_tags(&[("host", "server2")]);
assert!(!check_tags(&point_tags, &filter_diff_value));
let filter_bad_key = create_tags(&[("nonexistent", "value")]);
assert!(!check_tags(&point_tags, &filter_bad_key));
let filter_too_many =
create_tags(&[("host", "server1"), ("region", "us-east"), ("extra", "tag")]);
assert!(!check_tags(&point_tags, &filter_too_many));
let empty_tags = TagSet::new();
let any_filter = create_tags(&[("host", "any")]);
assert!(!check_tags(&empty_tags, &any_filter));
let empty_filter = TagSet::new();
assert!(check_tags(&point_tags, &empty_filter));
}
#[test]
fn test_execute_query_edge_cases() {
let empty_chunk = TimeSeriesChunk::default();
let empty_arc = Arc::new(RwLock::new(empty_chunk));
{
let empty_guard = empty_arc.read().unwrap();
let empty_results = execute_query(empty_guard, 0..100, None).unwrap();
assert_eq!(empty_results.len(), 0);
}
let ts = get_current_timestamp();
let point = DataPoint {
timestamp: ts,
value: 42.0,
tags: create_tags(&[("single", "point")]),
};
let single_chunk = create_test_chunk(vec![point]);
let single_arc = Arc::new(RwLock::new(single_chunk));
{
let single_guard = single_arc.read().unwrap();
let exact_results = execute_query(single_guard, ts..(ts + 1), None).unwrap();
assert_eq!(exact_results.len(), 1);
assert_eq!(exact_results[0], (ts, 42.0));
}
{
let single_guard = single_arc.read().unwrap();
let start_at_results = execute_query(single_guard, ts..(ts + 100), None).unwrap();
assert_eq!(start_at_results.len(), 1);
}
{
let single_guard = single_arc.read().unwrap();
let end_at_results = execute_query(single_guard, (ts - 100)..ts, None).unwrap();
assert_eq!(end_at_results.len(), 0);
}
}
}