#![allow(unused_imports)]
use std::collections::HashMap;
use std::fs;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::SystemTime;
use ugnos::*;
fn tags_from(pairs: &[(&str, &str)]) -> TagSet {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
fn sort_results(results: &mut [(Timestamp, Value)]) {
results.sort_by_key(|&(ts, _)| ts);
}
fn setup_test_dir(dir_name: &str) -> PathBuf {
let path = PathBuf::from(format!("./test_{}", dir_name));
let _ = fs::remove_dir_all(&path);
fs::create_dir_all(&path).unwrap();
path
}
#[test]
fn test_insert_and_query_single_series() {
let data_dir = setup_test_dir("insert_and_query_single_series");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir,
enable_segments: true,
enable_wal: false,
enable_snapshots: false,
snapshot_interval: Duration::from_secs(60 * 60),
..DbConfig::default()
};
let db = DbCore::with_config(config).unwrap();
let series_name = "test_series_1";
let tags1 = tags_from(&[("host", "serverA"), ("region", "us-east")]);
let tags2 = tags_from(&[("host", "serverB"), ("region", "us-west")]);
db.insert(series_name, 100, 10.5, tags1.clone()).unwrap();
db.insert(series_name, 200, 11.0, tags1.clone()).unwrap();
db.insert(series_name, 300, 12.5, tags2.clone()).unwrap();
db.insert(series_name, 150, 9.0, tags1.clone()).unwrap();
db.flush().unwrap();
let mut results_all = db.query(series_name, 0..400, None).unwrap();
sort_results(&mut results_all);
assert_eq!(results_all.len(), 4);
assert_eq!(
results_all,
vec![(100, 10.5), (150, 9.0), (200, 11.0), (300, 12.5)]
);
let mut results_range = db.query(series_name, 120..250, None).unwrap();
sort_results(&mut results_range);
assert_eq!(results_range.len(), 2);
assert_eq!(results_range, vec![(150, 9.0), (200, 11.0)]);
let filter1 = tags_from(&[("host", "serverA")]);
let mut results_tag1 = db.query(series_name, 0..400, Some(&filter1)).unwrap();
sort_results(&mut results_tag1);
assert_eq!(results_tag1.len(), 3);
assert_eq!(results_tag1, vec![(100, 10.5), (150, 9.0), (200, 11.0)]);
let filter1_full = tags_from(&[("host", "serverA"), ("region", "us-east")]);
let mut results_tag1_full = db.query(series_name, 0..400, Some(&filter1_full)).unwrap();
sort_results(&mut results_tag1_full);
assert_eq!(results_tag1_full.len(), 3);
assert_eq!(
results_tag1_full,
vec![(100, 10.5), (150, 9.0), (200, 11.0)]
);
let filter2 = tags_from(&[("region", "us-west")]);
let mut results_tag2 = db.query(series_name, 0..400, Some(&filter2)).unwrap();
sort_results(&mut results_tag2);
assert_eq!(results_tag2.len(), 1);
assert_eq!(results_tag2, vec![(300, 12.5)]);
let filter_no_match = tags_from(&[("host", "serverC")]);
let results_no_match = db
.query(series_name, 0..400, Some(&filter_no_match))
.unwrap();
assert!(results_no_match.is_empty());
let filter_partial = tags_from(&[("host", "serverA"), ("region", "us-west")]);
let results_partial = db
.query(series_name, 0..400, Some(&filter_partial))
.unwrap();
assert!(results_partial.is_empty());
}
#[test]
fn test_query_non_existent_series() {
let data_dir = setup_test_dir("query_non_existent_series");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir,
enable_segments: true,
enable_wal: false,
enable_snapshots: false,
snapshot_interval: Duration::from_secs(60 * 60),
..DbConfig::default()
};
let db = DbCore::with_config(config).unwrap();
let result = db.query("non_existent_series", 0..100, None);
match result {
Err(DbError::SeriesNotFound(name)) => assert_eq!(name, "non_existent_series"),
_ => panic!("Expected SeriesNotFound error"),
}
}
#[test]
fn test_insert_multiple_series() {
let data_dir = setup_test_dir("insert_multiple_series");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir,
enable_segments: true,
enable_wal: false,
enable_snapshots: false,
snapshot_interval: Duration::from_secs(60 * 60),
..DbConfig::default()
};
let db = DbCore::with_config(config).unwrap();
let series1 = "cpu_usage";
let series2 = "memory_usage";
let tags_s1 = tags_from(&[("host", "server1")]);
let tags_s2 = tags_from(&[("host", "server2")]);
db.insert(series1, 100, 0.8, tags_s1.clone()).unwrap();
db.insert(series2, 110, 55.2, tags_s2.clone()).unwrap();
db.insert(series1, 200, 0.7, tags_s1.clone()).unwrap();
db.insert(series2, 210, 56.8, tags_s2.clone()).unwrap();
db.flush().unwrap();
let mut results1 = db.query(series1, 0..300, None).unwrap();
sort_results(&mut results1);
assert_eq!(results1, vec![(100, 0.8), (200, 0.7)]);
let mut results2 = db.query(series2, 0..300, None).unwrap();
sort_results(&mut results2);
assert_eq!(results2, vec![(110, 55.2), (210, 56.8)]);
}
#[test]
fn test_concurrent_inserts() {
let data_dir = setup_test_dir("concurrent_inserts");
let config = DbConfig {
flush_interval: Duration::from_millis(20),
data_dir,
enable_segments: true,
enable_wal: false,
enable_snapshots: false,
snapshot_interval: Duration::from_secs(60 * 60),
..DbConfig::default()
};
let db = Arc::new(DbCore::with_config(config).unwrap()); let num_threads = 4;
let points_per_thread = 100;
let series_name = "concurrent_series";
let mut handles = vec![];
for i in 0..num_threads {
let db_clone = Arc::clone(&db);
let handle = thread::spawn(move || {
let start_ts = (i * points_per_thread) as u64;
for j in 0..points_per_thread {
let ts = start_ts + j as u64;
let val = ts as f64 * 1.1;
let tags =
tags_from(&[("thread_id", &i.to_string()), ("point_id", &j.to_string())]);
db_clone.insert(series_name, ts, val, tags).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
db.flush().unwrap();
let total_points = num_threads * points_per_thread;
let results = db
.query(series_name, 0..(total_points as u64), None)
.unwrap();
assert_eq!(results.len(), total_points);
let mut timestamps: Vec<u64> = results.iter().map(|(ts, _)| *ts).collect();
timestamps.sort();
for i in 0..(total_points as u64) {
assert_eq!(timestamps[i as usize], i);
}
}
#[test]
fn test_invalid_time_range() {
let data_dir = setup_test_dir("invalid_time_range");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir,
enable_segments: true,
enable_wal: false,
enable_snapshots: false,
snapshot_interval: Duration::from_secs(60 * 60),
..DbConfig::default()
};
let db = DbCore::with_config(config).unwrap();
db.insert("test", 100, 1.0, TagSet::new()).unwrap();
db.flush().unwrap();
#[allow(clippy::reversed_empty_ranges)]
let result = db.query("test", 100..50, None);
match result {
Err(DbError::InvalidTimeRange { start, end }) => {
assert_eq!(start, 100);
assert_eq!(end, 50);
}
_ => panic!("Expected InvalidTimeRange error"),
}
let result_equal = db.query("test", 100..100, None);
match result_equal {
Err(DbError::InvalidTimeRange { start, end }) => {
assert_eq!(start, 100);
assert_eq!(end, 100);
}
_ => panic!("Expected InvalidTimeRange error for equal start/end"),
}
}
#[test]
fn test_snapshot_and_recover() {
let data_dir = setup_test_dir("snapshot_recover");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir: data_dir.clone(),
wal_buffer_size: 10,
enable_wal: true,
enable_snapshots: true,
snapshot_interval: Duration::from_secs(10), enable_segments: false,
..DbConfig::default()
};
let mut pre_snapshot_timestamps = Vec::new();
let mut post_snapshot_timestamps = Vec::new();
{
let db = DbCore::with_config(config.clone()).unwrap();
let series_name = "test_snapshot_series";
let tags1 = tags_from(&[("host", "serverA"), ("region", "us-east")]);
let tags2 = tags_from(&[("host", "serverB"), ("region", "us-west")]);
println!("Inserting pre-snapshot data...");
for i in 0..100 {
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
+ i;
let val = i as f64 * 1.5;
let tags = if i % 2 == 0 {
tags1.clone()
} else {
tags2.clone()
};
db.insert(series_name, timestamp, val, tags).unwrap();
pre_snapshot_timestamps.push((timestamp, val));
std::thread::sleep(Duration::from_nanos(1));
}
db.flush().unwrap();
thread::sleep(Duration::from_millis(100));
println!("Creating snapshot...");
db.snapshot().unwrap();
thread::sleep(Duration::from_millis(100));
println!("Inserting post-snapshot data...");
for i in 0..50 {
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
+ i;
let val = i as f64 * 2.0;
let tags = if i % 2 == 0 {
tags1.clone()
} else {
tags2.clone()
};
db.insert(series_name, timestamp, val, tags).unwrap();
post_snapshot_timestamps.push((timestamp, val));
std::thread::sleep(Duration::from_nanos(1));
}
db.flush().unwrap();
thread::sleep(Duration::from_millis(100));
println!("First DB instance being dropped...");
}
println!("Creating new DB instance and recovering data...");
{
let mut db = DbCore::with_config(config).unwrap();
db.recover().unwrap();
let series_name = "test_snapshot_series";
let min_ts = pre_snapshot_timestamps
.iter()
.chain(post_snapshot_timestamps.iter())
.map(|(ts, _)| *ts)
.min()
.unwrap_or(0);
let max_ts = pre_snapshot_timestamps
.iter()
.chain(post_snapshot_timestamps.iter())
.map(|(ts, _)| *ts)
.max()
.unwrap_or(u64::MAX);
let mut results = db.query(series_name, min_ts..(max_ts + 1), None).unwrap();
sort_results(&mut results);
println!(
"Expected {} total points ({} pre-snapshot, {} post-snapshot)",
pre_snapshot_timestamps.len() + post_snapshot_timestamps.len(),
pre_snapshot_timestamps.len(),
post_snapshot_timestamps.len()
);
println!("Recovered {} points", results.len());
assert_eq!(
results.len(),
pre_snapshot_timestamps.len() + post_snapshot_timestamps.len(),
"Expected {} total points, but got {}",
pre_snapshot_timestamps.len() + post_snapshot_timestamps.len(),
results.len()
);
for (timestamp, value) in &pre_snapshot_timestamps {
assert!(
results.contains(&(*timestamp, *value)),
"Pre-snapshot point ({}, {}) not found in recovered data",
timestamp,
value
);
}
for (timestamp, value) in &post_snapshot_timestamps {
assert!(
results.contains(&(*timestamp, *value)),
"Post-snapshot point ({}, {}) not found in recovered data",
timestamp,
value
);
}
println!("All data points were successfully recovered!");
}
let _ = fs::remove_dir_all(data_dir);
}
#[test]
fn test_recover_with_no_persistence() {
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir: PathBuf::from("./temp_no_persistence"),
wal_buffer_size: 10,
enable_wal: false,
enable_snapshots: false,
snapshot_interval: Duration::from_secs(10),
enable_segments: false,
..DbConfig::default()
};
let mut db = DbCore::with_config(config).unwrap();
let result = db.recover();
assert!(result.is_ok());
}
#[test]
fn test_wal_recovery_only() {
let data_dir = setup_test_dir("wal_only");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir: data_dir.clone(),
wal_buffer_size: 10,
enable_wal: true,
enable_snapshots: false, snapshot_interval: Duration::from_secs(10),
enable_segments: false,
..DbConfig::default()
};
{
let db = DbCore::with_config(config.clone()).unwrap();
let series_name = "test_wal_series";
let tags = tags_from(&[("host", "serverA"), ("region", "us-east")]);
for i in 0..50 {
let ts = 1000 + i * 10;
let val = i as f64;
db.insert(series_name, ts, val, tags.clone()).unwrap();
}
db.flush().unwrap();
thread::sleep(Duration::from_millis(100));
}
{
let mut db = DbCore::with_config(config).unwrap();
db.recover().unwrap();
let series_name = "test_wal_series";
let mut results = db.query(series_name, 0..2000, None).unwrap();
sort_results(&mut results);
assert_eq!(results.len(), 50);
assert!(results.contains(&(1000, 0.0))); assert!(results.contains(&(1490, 49.0)));
for i in 0..50 {
let ts = 1000 + i * 10;
let val = i as f64;
assert!(results.contains(&(ts, val)));
}
}
let _ = fs::remove_dir_all(data_dir);
}
#[test]
fn test_snapshot_method_error() {
let data_dir = setup_test_dir("snapshot_method_error");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir: data_dir.clone(),
wal_buffer_size: 10,
enable_wal: true,
enable_snapshots: false, snapshot_interval: Duration::from_secs(10),
enable_segments: false,
..DbConfig::default()
};
let db = DbCore::with_config(config).unwrap();
let result = db.snapshot();
assert!(result.is_err());
if let Err(DbError::ConfigError(msg)) = result {
assert!(msg.contains("Snapshots are not enabled"));
} else {
panic!("Expected ConfigError but got something else");
}
let _ = fs::remove_dir_all(data_dir);
}
#[test]
fn test_wal_recovery_after_snapshot() {
let data_dir = setup_test_dir("snapshot_wal_recovery");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir: data_dir.clone(),
wal_buffer_size: 5, enable_wal: true,
enable_snapshots: true,
snapshot_interval: Duration::from_secs(10),
enable_segments: false,
..DbConfig::default()
};
let series_name = "test_recovery_series";
let mut pre_snapshot_timestamps = Vec::new();
let mut post_snapshot_timestamps = Vec::new();
let mut unflushed_timestamps = Vec::new();
{
let db = DbCore::with_config(config.clone()).unwrap();
let tags = tags_from(&[("host", "server1"), ("region", "us-east")]);
for i in 0..10 {
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
+ i;
let val = i as f64;
db.insert(series_name, timestamp, val, tags.clone())
.unwrap();
pre_snapshot_timestamps.push((timestamp, val));
std::thread::sleep(Duration::from_nanos(1));
}
db.flush().unwrap();
thread::sleep(Duration::from_millis(100));
db.snapshot().unwrap();
thread::sleep(Duration::from_millis(100));
for i in 10..20 {
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
+ i;
let val = i as f64;
db.insert(series_name, timestamp, val, tags.clone())
.unwrap();
post_snapshot_timestamps.push((timestamp, val));
std::thread::sleep(Duration::from_nanos(1));
}
db.flush().unwrap();
thread::sleep(Duration::from_millis(100));
for i in 20..30 {
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
+ i;
let val = i as f64;
db.insert(series_name, timestamp, val, tags.clone())
.unwrap();
unflushed_timestamps.push((timestamp, val));
std::thread::sleep(Duration::from_nanos(1));
}
}
{
let mut db = DbCore::with_config(config).unwrap();
db.recover().unwrap();
let min_ts = pre_snapshot_timestamps
.iter()
.chain(post_snapshot_timestamps.iter())
.chain(unflushed_timestamps.iter())
.map(|(ts, _)| *ts)
.min()
.unwrap_or(0);
let max_ts = pre_snapshot_timestamps
.iter()
.chain(post_snapshot_timestamps.iter())
.chain(unflushed_timestamps.iter())
.map(|(ts, _)| *ts)
.max()
.unwrap_or(u64::MAX);
let mut results = db.query(series_name, min_ts..(max_ts + 1), None).unwrap();
sort_results(&mut results);
let expected_min = pre_snapshot_timestamps.len() + post_snapshot_timestamps.len();
let expected_max = expected_min + unflushed_timestamps.len();
println!("Recovered {} points", results.len());
println!(
"Expected between {} and {} points",
expected_min, expected_max
);
assert!(
results.len() >= expected_min,
"Expected at least {} points, got {}",
expected_min,
results.len()
);
for (timestamp, value) in &pre_snapshot_timestamps {
assert!(
results.contains(&(*timestamp, *value)),
"Pre-snapshot point ({}, {}) not found in recovered data",
timestamp,
value
);
}
for (timestamp, value) in &post_snapshot_timestamps {
assert!(
results.contains(&(*timestamp, *value)),
"Post-snapshot point ({}, {}) not found in recovered data",
timestamp,
value
);
}
if results.len() > expected_min {
let unflushed_recovered = unflushed_timestamps
.iter()
.filter(|(ts, val)| results.contains(&(*ts, *val)))
.count();
println!(
"Recovered {}/{} points from unflushed batch (auto-flush on drop)",
unflushed_recovered,
unflushed_timestamps.len()
);
}
}
let _ = fs::remove_dir_all(data_dir);
}
#[test]
fn test_empty_inserts_and_tags() {
let data_dir = setup_test_dir("empty_inserts_and_tags");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir,
enable_segments: true,
enable_wal: false,
enable_snapshots: false,
snapshot_interval: Duration::from_secs(60 * 60),
..DbConfig::default()
};
let db = DbCore::with_config(config).unwrap();
let empty_tags = TagSet::new();
let series_name = "empty_tags_series";
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
db.insert(series_name, timestamp, 42.0, empty_tags.clone())
.unwrap();
db.flush().unwrap();
let results = db
.query(series_name, 0..(timestamp + 1), Some(&empty_tags))
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0], (timestamp, 42.0));
let empty_series = "no_data_series";
let results = db.query(empty_series, 0..u64::MAX, None);
assert!(matches!(results, Err(DbError::SeriesNotFound(_))));
let non_matching_tags = tags_from(&[("region", "nowhere")]);
let results = db
.query(series_name, 0..(timestamp + 1), Some(&non_matching_tags))
.unwrap();
assert_eq!(
results.len(),
0,
"Query with non-matching tags should return empty results"
);
db.flush().unwrap();
}
#[test]
fn test_large_number_of_points() {
let data_dir = setup_test_dir("large_number_of_points");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir,
enable_segments: true,
enable_wal: false,
enable_snapshots: false,
snapshot_interval: Duration::from_secs(60 * 60),
..DbConfig::default()
};
let db = DbCore::with_config(config).unwrap();
let series_name = "large_series";
let num_points = 10_000;
println!("Inserting {} points...", num_points);
let start_time = std::time::Instant::now();
for i in 0..num_points {
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
+ i as u64;
let value = i as f64 * 0.1;
let tags = if i % 3 == 0 {
tags_from(&[("region", "us-east"), ("host", "server1")])
} else if i % 3 == 1 {
tags_from(&[("region", "us-west"), ("host", "server2")])
} else {
tags_from(&[("region", "eu-central"), ("host", "server3")])
};
db.insert(series_name, timestamp, value, tags).unwrap();
if i % 1000 == 0 {
thread::sleep(Duration::from_nanos(1));
}
}
let insert_duration = start_time.elapsed();
println!("Inserted {} points in {:?}", num_points, insert_duration);
db.flush().unwrap();
let min_timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let max_timestamp = min_timestamp + num_points as u64;
let query_start = std::time::Instant::now();
let us_east_filter = tags_from(&[("region", "us-east")]);
let us_east_results = db
.query(series_name, 0..max_timestamp, Some(&us_east_filter))
.unwrap();
assert!(
us_east_results.len() > num_points / 4,
"Expected at least 1/4 of points in us-east"
);
assert!(
us_east_results.len() < num_points / 2,
"Expected less than 1/2 of points in us-east"
);
let eu_server3_filter = tags_from(&[("region", "eu-central"), ("host", "server3")]);
let eu_server3_results = db
.query(series_name, 0..max_timestamp, Some(&eu_server3_filter))
.unwrap();
assert!(
eu_server3_results.len() > num_points / 4,
"Expected at least 1/4 of points for eu-central+server3"
);
assert!(
eu_server3_results.len() < num_points / 2,
"Expected less than 1/2 of points for eu-central+server3"
);
let all_results = db.query(series_name, 0..max_timestamp, None).unwrap();
assert_eq!(
all_results.len(),
num_points,
"All points should be returned in full query"
);
let query_duration = query_start.elapsed();
println!("Queried {} points in {:?}", num_points, query_duration);
}
#[test]
fn test_specific_tag_combinations() {
let data_dir = setup_test_dir("specific_tag_combinations");
let config = DbConfig {
flush_interval: Duration::from_millis(50),
data_dir,
enable_segments: true,
enable_wal: false,
enable_snapshots: false,
snapshot_interval: Duration::from_secs(60 * 60),
..DbConfig::default()
};
let db = DbCore::with_config(config).unwrap();
let series_name = "tag_combinations";
let test_data = vec![
(
tags_from(&[
("region", "us-east"),
("datacenter", "dc1"),
("rack", "r1"),
("host", "h1"),
("service", "api"),
]),
100.0,
),
(
tags_from(&[
("region", "us-east"),
("datacenter", "dc1"),
("rack", "r1"),
("host", "h2"),
("service", "db"),
]),
200.0,
),
(
tags_from(&[
("region", "us-east"),
("datacenter", "dc1"),
("rack", "r2"),
("host", "h3"),
("service", "api"),
]),
300.0,
),
(
tags_from(&[
("region", "us-east"),
("datacenter", "dc2"),
("rack", "r3"),
("host", "h4"),
("service", "cache"),
]),
400.0,
),
(
tags_from(&[
("region", "us-west"),
("datacenter", "dc3"),
("rack", "r4"),
("host", "h5"),
("service", "api"),
]),
500.0,
),
(
tags_from(&[
("region", "us-west"),
("datacenter", "dc3"),
("rack", "r4"),
("host", "h6"),
("service", "db"),
]),
600.0,
),
(
tags_from(&[("os", "linux"), ("version", "ubuntu"), ("kernel", "5.4")]),
700.0,
),
(
tags_from(&[("os", "linux"), ("version", "centos"), ("kernel", "4.18")]),
800.0,
),
(
tags_from(&[
("os", "windows"),
("version", "server2019"),
("kernel", "10.0"),
]),
900.0,
),
(
tags_from(&[
("user-agent", "Mozilla/5.0"),
("path", "/api/v1/users"),
("status", "200"),
]),
1000.0,
),
(
tags_from(&[("tag_with_empty_value", ""), ("unicode", "ä½ å¥½ä¸–ç•Œ")]),
1100.0,
),
(
tags_from(&[
("t1", "v1"),
("t2", "v2"),
("t3", "v3"),
("t4", "v4"),
("t5", "v5"),
("t6", "v6"),
("t7", "v7"),
("t8", "v8"),
("t9", "v9"),
("t10", "v10"),
]),
1200.0,
),
];
for (i, (tags, value)) in test_data.iter().enumerate() {
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
+ i as u64;
db.insert(series_name, timestamp, *value, tags.clone())
.unwrap();
thread::sleep(Duration::from_nanos(1));
}
db.flush().unwrap();
let region_dc_filter = tags_from(&[("region", "us-east"), ("datacenter", "dc1")]);
let region_dc_results = db
.query(series_name, 0..u64::MAX, Some(®ion_dc_filter))
.unwrap();
assert_eq!(
region_dc_results.len(),
3,
"Expected 3 points matching us-east + dc1"
);
let service_filter = tags_from(&[("service", "api")]);
let service_results = db
.query(series_name, 0..u64::MAX, Some(&service_filter))
.unwrap();
assert_eq!(
service_results.len(),
3,
"Expected 3 points with service=api"
);
let os_kernel_filter = tags_from(&[("os", "linux"), ("kernel", "5.4")]);
let os_kernel_results = db
.query(series_name, 0..u64::MAX, Some(&os_kernel_filter))
.unwrap();
assert_eq!(
os_kernel_results.len(),
1,
"Expected 1 point with os=linux + kernel=5.4"
);
let special_filter = tags_from(&[("status", "200")]);
let special_results = db
.query(series_name, 0..u64::MAX, Some(&special_filter))
.unwrap();
assert_eq!(special_results.len(), 1, "Expected 1 point with status=200");
let empty_value_filter = tags_from(&[("tag_with_empty_value", "")]);
let empty_value_results = db
.query(series_name, 0..u64::MAX, Some(&empty_value_filter))
.unwrap();
assert_eq!(
empty_value_results.len(),
1,
"Expected 1 point with empty tag value"
);
let complex_filter = tags_from(&[("t1", "v1"), ("t5", "v5"), ("t10", "v10")]);
let complex_results = db
.query(series_name, 0..u64::MAX, Some(&complex_filter))
.unwrap();
assert_eq!(
complex_results.len(),
1,
"Expected 1 point with multiple specific tags"
);
let nonexistent_filter = tags_from(&[("nonexistent", "value")]);
let nonexistent_results = db
.query(series_name, 0..u64::MAX, Some(&nonexistent_filter))
.unwrap();
assert_eq!(
nonexistent_results.len(),
0,
"Expected 0 points with non-existent tag"
);
}