use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::instrument;
use crate::analyzers::context::AnalyzerContext;
use crate::error::{Result, TermError};
use super::{MetricsRepository, ResultKey};
pub struct MetricsQuery {
repository: Arc<dyn MetricsRepository>,
before: Option<i64>,
after: Option<i64>,
tags: HashMap<String, String>,
analyzers: Option<Vec<String>>,
limit: Option<usize>,
offset: Option<usize>,
sort_order: SortOrder,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortOrder {
Ascending,
Descending,
}
impl MetricsQuery {
pub fn new(repository: Arc<dyn MetricsRepository>) -> Self {
Self {
repository,
before: None,
after: None,
tags: HashMap::new(),
analyzers: None,
limit: None,
offset: None,
sort_order: SortOrder::Descending,
}
}
pub fn before(mut self, timestamp: i64) -> Self {
self.before = Some(timestamp);
self
}
pub fn after(mut self, timestamp: i64) -> Self {
self.after = Some(timestamp);
self
}
pub fn between(mut self, start: i64, end: i64) -> Self {
self.after = Some(start);
self.before = Some(end);
self
}
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.insert(key.into(), value.into());
self
}
pub fn with_tags<I, K, V>(mut self, tags: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
for (k, v) in tags {
self.tags.insert(k.into(), v.into());
}
self
}
pub fn for_analyzers<I, S>(mut self, analyzers: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.analyzers = Some(analyzers.into_iter().map(|s| s.into()).collect());
self
}
pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
pub fn validate(&self) -> Result<()> {
if let (Some(after), Some(before)) = (self.after, self.before) {
if after >= before {
return Err(TermError::invalid_repository_query(
"Invalid time range: 'after' timestamp must be less than 'before' timestamp",
format!("after: {after}, before: {before}"),
));
}
}
if let Some(limit) = self.limit {
if limit == 0 {
return Err(TermError::invalid_repository_query(
"Limit must be greater than 0",
format!("limit: {limit}"),
));
}
if limit > 1_000_000 {
return Err(TermError::invalid_repository_query(
"Limit too large (max: 1,000,000)",
format!("limit: {limit}"),
));
}
}
for (key, value) in &self.tags {
if key.is_empty() {
return Err(TermError::invalid_repository_query(
"Tag key cannot be empty",
format!("tag: '{key}' = '{value}'"),
));
}
if key.len() > 256 {
return Err(TermError::invalid_repository_query(
"Tag key too long (max: 256 characters)",
format!("tag: '{key}' ({} chars)", key.len()),
));
}
if value.len() > 1024 {
return Err(TermError::invalid_repository_query(
"Tag value too long (max: 1024 characters)",
format!("tag: '{key}' = '{value}' ({} chars)", value.len()),
));
}
}
if let Some(ref analyzers) = self.analyzers {
if analyzers.is_empty() {
return Err(TermError::invalid_repository_query(
"Analyzer list cannot be empty (use None instead)",
"analyzers: []".to_string(),
));
}
for analyzer in analyzers {
if analyzer.is_empty() {
return Err(TermError::invalid_repository_query(
"Analyzer name cannot be empty",
format!("analyzers: {analyzers:?}"),
));
}
}
}
Ok(())
}
pub fn offset(mut self, offset: usize) -> Self {
self.offset = Some(offset);
self
}
pub fn sort(mut self, order: SortOrder) -> Self {
self.sort_order = order;
self
}
#[instrument(skip(self), fields(
query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
query.filters.tag_count = self.tags.len(),
query.limit = self.limit,
query.offset = self.offset
))]
pub async fn execute(self) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
self.validate()?;
let all_keys = self.repository.list_keys().await?;
let mut filtered_results = Vec::new();
for key in all_keys {
if let Some(before) = self.before {
if key.timestamp >= before {
continue;
}
}
if let Some(after) = self.after {
if key.timestamp < after {
continue;
}
}
if !key.matches_tags(&self.tags) {
continue;
}
let context = match self.repository.get(&key).await {
Ok(Some(ctx)) => ctx,
_ => AnalyzerContext::new(),
};
if let Some(ref analyzers) = self.analyzers {
let has_analyzer = analyzers
.iter()
.any(|analyzer| !context.get_analyzer_metrics(analyzer).is_empty());
if !has_analyzer && !context.all_metrics().is_empty() {
continue;
}
}
filtered_results.push((key, context));
}
match self.sort_order {
SortOrder::Ascending => {
filtered_results.sort_by_key(|(key, _)| key.timestamp);
}
SortOrder::Descending => {
filtered_results.sort_by_key(|(key, _)| -key.timestamp);
}
}
if let Some(offset) = self.offset {
filtered_results = filtered_results.into_iter().skip(offset).collect();
}
if let Some(limit) = self.limit {
filtered_results.truncate(limit);
}
Ok(filtered_results)
}
#[instrument(skip(self), fields(
query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
query.filters.tag_count = self.tags.len()
))]
pub async fn count(self) -> Result<usize> {
let results = self.execute().await?;
Ok(results.len())
}
#[instrument(skip(self), fields(
query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
query.filters.tag_count = self.tags.len()
))]
pub async fn exists(self) -> Result<bool> {
let limited = self.limit(1);
let results = limited.execute().await?;
Ok(!results.is_empty())
}
pub fn get_before(&self) -> Option<i64> {
self.before
}
pub fn get_after(&self) -> Option<i64> {
self.after
}
pub fn get_tags(&self) -> &HashMap<String, String> {
&self.tags
}
pub fn get_analyzers(&self) -> &Option<Vec<String>> {
&self.analyzers
}
pub fn get_limit(&self) -> Option<usize> {
self.limit
}
pub fn get_offset(&self) -> Option<usize> {
self.offset
}
pub fn get_sort_order(&self) -> SortOrder {
self.sort_order
}
pub fn is_ascending(&self) -> bool {
self.sort_order == SortOrder::Ascending
}
}
#[async_trait]
pub trait QueryExecutor: MetricsRepository {
#[instrument(skip(self, query))]
async fn execute_query(
&self,
query: MetricsQuery,
) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
query.execute().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::MetricsRepository;
struct MockRepository;
#[async_trait]
impl MetricsRepository for MockRepository {
async fn save(&self, _key: ResultKey, _metrics: AnalyzerContext) -> Result<()> {
Ok(())
}
async fn load(&self) -> MetricsQuery {
MetricsQuery::new(Arc::new(MockRepository))
}
async fn delete(&self, _key: ResultKey) -> Result<()> {
Ok(())
}
async fn list_keys(&self) -> Result<Vec<ResultKey>> {
Ok(vec![
ResultKey::new(1000).with_tag("env", "prod"),
ResultKey::new(2000).with_tag("env", "staging"),
ResultKey::new(3000)
.with_tag("env", "prod")
.with_tag("version", "1.0"),
ResultKey::new(4000)
.with_tag("env", "prod")
.with_tag("version", "2.0"),
])
}
}
#[tokio::test]
async fn test_query_time_filters() {
let repo = Arc::new(MockRepository);
let query = MetricsQuery::new(repo.clone()).after(1500).before(3500);
let results = query.execute().await.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].0.timestamp, 3000);
assert_eq!(results[1].0.timestamp, 2000);
}
#[tokio::test]
async fn test_query_tag_filters() {
let repo = Arc::new(MockRepository);
let query = MetricsQuery::new(repo.clone()).with_tag("env", "prod");
let results = query.execute().await.unwrap();
assert_eq!(results.len(), 3);
}
#[tokio::test]
async fn test_query_multiple_tags() {
let repo = Arc::new(MockRepository);
let query = MetricsQuery::new(repo.clone())
.with_tag("env", "prod")
.with_tag("version", "1.0");
let results = query.execute().await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].0.timestamp, 3000);
}
#[tokio::test]
async fn test_query_sort_order() {
let repo = Arc::new(MockRepository);
let query = MetricsQuery::new(repo.clone());
let results = query.execute().await.unwrap();
assert_eq!(results[0].0.timestamp, 4000);
assert_eq!(results[3].0.timestamp, 1000);
let query = MetricsQuery::new(repo.clone()).sort(SortOrder::Ascending);
let results = query.execute().await.unwrap();
assert_eq!(results[0].0.timestamp, 1000);
assert_eq!(results[3].0.timestamp, 4000);
}
#[tokio::test]
async fn test_query_pagination() {
let repo = Arc::new(MockRepository);
let query = MetricsQuery::new(repo.clone())
.sort(SortOrder::Ascending)
.offset(1)
.limit(2);
let results = query.execute().await.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].0.timestamp, 2000);
assert_eq!(results[1].0.timestamp, 3000);
}
#[tokio::test]
async fn test_query_exists() {
let repo = Arc::new(MockRepository);
let exists = MetricsQuery::new(repo.clone())
.with_tag("env", "prod")
.exists()
.await
.unwrap();
assert!(exists);
let not_exists = MetricsQuery::new(repo.clone())
.with_tag("env", "nonexistent")
.exists()
.await
.unwrap();
assert!(!not_exists);
}
}