term_guard/repository/query.rs
1//! Query builder for filtering and retrieving metrics from repositories.
2
3use async_trait::async_trait;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::instrument;
7
8use crate::analyzers::context::AnalyzerContext;
9use crate::error::{Result, TermError};
10
11use super::{MetricsRepository, ResultKey};
12
13/// Builder for constructing queries against a metrics repository.
14///
15/// `MetricsQuery` provides a fluent API for filtering metrics by various criteria
16/// including time ranges, tags, and analyzer types. Queries are constructed
17/// incrementally and executed asynchronously.
18///
19/// # Example
20///
21/// ```rust,ignore
22/// use term_guard::repository::MetricsQuery;
23///
24/// let results = repository.load().await
25/// .after(start_timestamp)
26/// .before(end_timestamp)
27/// .with_tag("environment", "production")
28/// .for_analyzers(vec!["completeness", "size"])
29/// .execute()
30/// .await?;
31///
32/// for (key, context) in results {
33/// println!("Metrics at {}: {:?}", key.timestamp, context.all_metrics());
34/// }
35/// ```
36pub struct MetricsQuery {
37 /// The repository to query against.
38 repository: Arc<dyn MetricsRepository>,
39
40 /// Filter for metrics before this timestamp (exclusive).
41 before: Option<i64>,
42
43 /// Filter for metrics after this timestamp (inclusive).
44 after: Option<i64>,
45
46 /// Filter for metrics with matching tags.
47 tags: HashMap<String, String>,
48
49 /// Filter for specific analyzer types.
50 analyzers: Option<Vec<String>>,
51
52 /// Maximum number of results to return.
53 limit: Option<usize>,
54
55 /// Offset for pagination.
56 offset: Option<usize>,
57
58 /// Sort order for results.
59 sort_order: SortOrder,
60}
61
62/// Sort order for query results.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SortOrder {
65 /// Sort by timestamp ascending (oldest first).
66 Ascending,
67 /// Sort by timestamp descending (newest first).
68 Descending,
69}
70
71impl MetricsQuery {
72 /// Creates a new query for the given repository.
73 ///
74 /// # Arguments
75 ///
76 /// * `repository` - The repository to query against
77 pub fn new(repository: Arc<dyn MetricsRepository>) -> Self {
78 Self {
79 repository,
80 before: None,
81 after: None,
82 tags: HashMap::new(),
83 analyzers: None,
84 limit: None,
85 offset: None,
86 sort_order: SortOrder::Descending,
87 }
88 }
89
90 /// Filters results to metrics before the specified timestamp.
91 ///
92 /// # Arguments
93 ///
94 /// * `timestamp` - Unix timestamp in milliseconds (exclusive)
95 ///
96 /// # Example
97 ///
98 /// ```rust,ignore
99 /// let end_time = chrono::Utc::now().timestamp_millis();
100 /// let query = repository.load().await.before(end_time);
101 /// ```
102 pub fn before(mut self, timestamp: i64) -> Self {
103 self.before = Some(timestamp);
104 self
105 }
106
107 /// Filters results to metrics after the specified timestamp.
108 ///
109 /// # Arguments
110 ///
111 /// * `timestamp` - Unix timestamp in milliseconds (inclusive)
112 ///
113 /// # Example
114 ///
115 /// ```rust,ignore
116 /// let start_time = (chrono::Utc::now() - chrono::Duration::days(7)).timestamp_millis();
117 /// let query = repository.load().await.after(start_time);
118 /// ```
119 pub fn after(mut self, timestamp: i64) -> Self {
120 self.after = Some(timestamp);
121 self
122 }
123
124 /// Filters results to metrics within a time range.
125 ///
126 /// # Arguments
127 ///
128 /// * `start` - Start timestamp in milliseconds (inclusive)
129 /// * `end` - End timestamp in milliseconds (exclusive)
130 ///
131 /// # Example
132 ///
133 /// ```rust,ignore
134 /// let query = repository.load().await.between(start_time, end_time);
135 /// ```
136 pub fn between(mut self, start: i64, end: i64) -> Self {
137 self.after = Some(start);
138 self.before = Some(end);
139 self
140 }
141
142 /// Filters results to metrics with a specific tag.
143 ///
144 /// # Arguments
145 ///
146 /// * `key` - The tag key
147 /// * `value` - The tag value
148 ///
149 /// # Example
150 ///
151 /// ```rust,ignore
152 /// let query = repository.load().await
153 /// .with_tag("environment", "production")
154 /// .with_tag("region", "us-west-2");
155 /// ```
156 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
157 self.tags.insert(key.into(), value.into());
158 self
159 }
160
161 /// Filters results to metrics with multiple tags.
162 ///
163 /// # Arguments
164 ///
165 /// * `tags` - Iterator of (key, value) pairs
166 ///
167 /// # Example
168 ///
169 /// ```rust,ignore
170 /// let tags = vec![
171 /// ("environment", "production"),
172 /// ("dataset", "users"),
173 /// ];
174 /// let query = repository.load().await.with_tags(tags);
175 /// ```
176 pub fn with_tags<I, K, V>(mut self, tags: I) -> Self
177 where
178 I: IntoIterator<Item = (K, V)>,
179 K: Into<String>,
180 V: Into<String>,
181 {
182 for (k, v) in tags {
183 self.tags.insert(k.into(), v.into());
184 }
185 self
186 }
187
188 /// Filters results to specific analyzer types.
189 ///
190 /// Only metrics from the specified analyzers will be included in the results.
191 ///
192 /// # Arguments
193 ///
194 /// * `analyzers` - List of analyzer names
195 ///
196 /// # Example
197 ///
198 /// ```rust,ignore
199 /// let query = repository.load().await
200 /// .for_analyzers(vec!["completeness", "size", "mean"]);
201 /// ```
202 pub fn for_analyzers<I, S>(mut self, analyzers: I) -> Self
203 where
204 I: IntoIterator<Item = S>,
205 S: Into<String>,
206 {
207 self.analyzers = Some(analyzers.into_iter().map(|s| s.into()).collect());
208 self
209 }
210
211 /// Limits the number of results returned.
212 ///
213 /// # Arguments
214 ///
215 /// * `limit` - Maximum number of results
216 ///
217 /// # Example
218 ///
219 /// ```rust,ignore
220 /// let query = repository.load().await.limit(100);
221 /// ```
222 pub fn limit(mut self, limit: usize) -> Self {
223 self.limit = Some(limit);
224 self
225 }
226
227 /// Validates query parameters for correctness.
228 ///
229 /// # Returns
230 ///
231 /// Returns an error if the query parameters are invalid.
232 pub fn validate(&self) -> Result<()> {
233 // Validate time range
234 if let (Some(after), Some(before)) = (self.after, self.before) {
235 if after >= before {
236 return Err(TermError::invalid_repository_query(
237 "Invalid time range: 'after' timestamp must be less than 'before' timestamp",
238 format!("after: {after}, before: {before}"),
239 ));
240 }
241 }
242
243 // Validate limit
244 if let Some(limit) = self.limit {
245 if limit == 0 {
246 return Err(TermError::invalid_repository_query(
247 "Limit must be greater than 0",
248 format!("limit: {limit}"),
249 ));
250 }
251 if limit > 1_000_000 {
252 return Err(TermError::invalid_repository_query(
253 "Limit too large (max: 1,000,000)",
254 format!("limit: {limit}"),
255 ));
256 }
257 }
258
259 // Validate tag keys and values
260 for (key, value) in &self.tags {
261 if key.is_empty() {
262 return Err(TermError::invalid_repository_query(
263 "Tag key cannot be empty",
264 format!("tag: '{key}' = '{value}'"),
265 ));
266 }
267 if key.len() > 256 {
268 return Err(TermError::invalid_repository_query(
269 "Tag key too long (max: 256 characters)",
270 format!("tag: '{key}' ({} chars)", key.len()),
271 ));
272 }
273 if value.len() > 1024 {
274 return Err(TermError::invalid_repository_query(
275 "Tag value too long (max: 1024 characters)",
276 format!("tag: '{key}' = '{value}' ({} chars)", value.len()),
277 ));
278 }
279 }
280
281 // Validate analyzer names
282 if let Some(ref analyzers) = self.analyzers {
283 if analyzers.is_empty() {
284 return Err(TermError::invalid_repository_query(
285 "Analyzer list cannot be empty (use None instead)",
286 "analyzers: []".to_string(),
287 ));
288 }
289 for analyzer in analyzers {
290 if analyzer.is_empty() {
291 return Err(TermError::invalid_repository_query(
292 "Analyzer name cannot be empty",
293 format!("analyzers: {analyzers:?}"),
294 ));
295 }
296 }
297 }
298
299 Ok(())
300 }
301
302 /// Sets the offset for pagination.
303 ///
304 /// # Arguments
305 ///
306 /// * `offset` - Number of results to skip
307 ///
308 /// # Example
309 ///
310 /// ```rust,ignore
311 /// // Get results 100-200
312 /// let query = repository.load().await.offset(100).limit(100);
313 /// ```
314 pub fn offset(mut self, offset: usize) -> Self {
315 self.offset = Some(offset);
316 self
317 }
318
319 /// Sets the sort order for results.
320 ///
321 /// # Arguments
322 ///
323 /// * `order` - The sort order to use
324 ///
325 /// # Example
326 ///
327 /// ```rust,ignore
328 /// use term_guard::repository::query::SortOrder;
329 ///
330 /// let query = repository.load().await.sort(SortOrder::Ascending);
331 /// ```
332 pub fn sort(mut self, order: SortOrder) -> Self {
333 self.sort_order = order;
334 self
335 }
336
337 /// Executes the query and returns the results.
338 ///
339 /// # Returns
340 ///
341 /// A vector of (ResultKey, AnalyzerContext) pairs matching the query criteria,
342 /// sorted by timestamp according to the specified sort order.
343 ///
344 /// # Errors
345 ///
346 /// Returns an error if the query execution fails (e.g., I/O error, invalid query).
347 ///
348 /// # Example
349 ///
350 /// ```rust,ignore
351 /// let results = repository.load().await
352 /// .after(start_time)
353 /// .with_tag("environment", "production")
354 /// .execute()
355 /// .await?;
356 ///
357 /// for (key, context) in results {
358 /// println!("Timestamp: {}", key.timestamp);
359 /// println!("Metrics: {:?}", context.all_metrics());
360 /// }
361 /// ```
362 #[instrument(skip(self), fields(
363 query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
364 query.filters.tag_count = self.tags.len(),
365 query.limit = self.limit,
366 query.offset = self.offset
367 ))]
368 pub async fn execute(self) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
369 // Validate query parameters before execution
370 self.validate()?;
371
372 // This is a default implementation that can be overridden by specific repositories
373 // for more efficient querying. For now, we'll load all keys and filter in memory.
374
375 let all_keys = self.repository.list_keys().await?;
376
377 let mut filtered_results = Vec::new();
378
379 for key in all_keys {
380 // Apply time filters
381 if let Some(before) = self.before {
382 if key.timestamp >= before {
383 continue;
384 }
385 }
386
387 if let Some(after) = self.after {
388 if key.timestamp < after {
389 continue;
390 }
391 }
392
393 // Apply tag filters
394 if !key.matches_tags(&self.tags) {
395 continue;
396 }
397
398 // Load the context for this key
399 // Try to get the actual context from the repository
400 let context = match self.repository.get(&key).await {
401 Ok(Some(ctx)) => ctx,
402 _ => AnalyzerContext::new(),
403 };
404
405 // Apply analyzer filter if specified
406 if let Some(ref analyzers) = self.analyzers {
407 // Check if any of the requested analyzers have metrics
408 let has_analyzer = analyzers
409 .iter()
410 .any(|analyzer| !context.get_analyzer_metrics(analyzer).is_empty());
411
412 if !has_analyzer && !context.all_metrics().is_empty() {
413 continue;
414 }
415 }
416
417 filtered_results.push((key, context));
418 }
419
420 // Sort results
421 match self.sort_order {
422 SortOrder::Ascending => {
423 filtered_results.sort_by_key(|(key, _)| key.timestamp);
424 }
425 SortOrder::Descending => {
426 filtered_results.sort_by_key(|(key, _)| -key.timestamp);
427 }
428 }
429
430 // Apply pagination
431 if let Some(offset) = self.offset {
432 filtered_results = filtered_results.into_iter().skip(offset).collect();
433 }
434
435 if let Some(limit) = self.limit {
436 filtered_results.truncate(limit);
437 }
438
439 Ok(filtered_results)
440 }
441
442 /// Returns a count of metrics matching the query criteria without loading them.
443 ///
444 /// This is more efficient than executing the query and counting results
445 /// when only the count is needed.
446 ///
447 /// # Errors
448 ///
449 /// Returns an error if the count operation fails.
450 #[instrument(skip(self), fields(
451 query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
452 query.filters.tag_count = self.tags.len()
453 ))]
454 pub async fn count(self) -> Result<usize> {
455 // Default implementation executes the query and counts results
456 // Specific repositories can override this for efficiency
457 let results = self.execute().await?;
458 Ok(results.len())
459 }
460
461 /// Checks if any metrics match the query criteria.
462 ///
463 /// # Returns
464 ///
465 /// Returns `true` if at least one metric matches, `false` otherwise.
466 ///
467 /// # Errors
468 ///
469 /// Returns an error if the check operation fails.
470 #[instrument(skip(self), fields(
471 query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
472 query.filters.tag_count = self.tags.len()
473 ))]
474 pub async fn exists(self) -> Result<bool> {
475 let limited = self.limit(1);
476 let results = limited.execute().await?;
477 Ok(!results.is_empty())
478 }
479
480 /// Accessor methods for DataFusion integration
481 pub fn get_before(&self) -> Option<i64> {
482 self.before
483 }
484
485 pub fn get_after(&self) -> Option<i64> {
486 self.after
487 }
488
489 pub fn get_tags(&self) -> &HashMap<String, String> {
490 &self.tags
491 }
492
493 pub fn get_analyzers(&self) -> &Option<Vec<String>> {
494 &self.analyzers
495 }
496
497 pub fn get_limit(&self) -> Option<usize> {
498 self.limit
499 }
500
501 pub fn get_offset(&self) -> Option<usize> {
502 self.offset
503 }
504
505 pub fn get_sort_order(&self) -> SortOrder {
506 self.sort_order
507 }
508
509 pub fn is_ascending(&self) -> bool {
510 self.sort_order == SortOrder::Ascending
511 }
512}
513
514/// Extension trait for repositories to provide custom query execution.
515#[async_trait]
516pub trait QueryExecutor: MetricsRepository {
517 /// Executes a query with repository-specific optimizations.
518 ///
519 /// Repositories can implement this method to provide more efficient
520 /// query execution than the default in-memory filtering.
521 #[instrument(skip(self, query))]
522 async fn execute_query(
523 &self,
524 query: MetricsQuery,
525 ) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
526 // Default implementation delegates to the query's execute method
527 query.execute().await
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534
535 use super::MetricsRepository;
536
537 // Mock repository for testing
538 struct MockRepository;
539
540 #[async_trait]
541 impl MetricsRepository for MockRepository {
542 async fn save(&self, _key: ResultKey, _metrics: AnalyzerContext) -> Result<()> {
543 Ok(())
544 }
545
546 async fn load(&self) -> MetricsQuery {
547 MetricsQuery::new(Arc::new(MockRepository))
548 }
549
550 async fn delete(&self, _key: ResultKey) -> Result<()> {
551 Ok(())
552 }
553
554 async fn list_keys(&self) -> Result<Vec<ResultKey>> {
555 Ok(vec![
556 ResultKey::new(1000).with_tag("env", "prod"),
557 ResultKey::new(2000).with_tag("env", "staging"),
558 ResultKey::new(3000)
559 .with_tag("env", "prod")
560 .with_tag("version", "1.0"),
561 ResultKey::new(4000)
562 .with_tag("env", "prod")
563 .with_tag("version", "2.0"),
564 ])
565 }
566 }
567
568 #[tokio::test]
569 async fn test_query_time_filters() {
570 let repo = Arc::new(MockRepository);
571 let query = MetricsQuery::new(repo.clone()).after(1500).before(3500);
572
573 let results = query.execute().await.unwrap();
574 assert_eq!(results.len(), 2);
575 assert_eq!(results[0].0.timestamp, 3000);
576 assert_eq!(results[1].0.timestamp, 2000);
577 }
578
579 #[tokio::test]
580 async fn test_query_tag_filters() {
581 let repo = Arc::new(MockRepository);
582 let query = MetricsQuery::new(repo.clone()).with_tag("env", "prod");
583
584 let results = query.execute().await.unwrap();
585 assert_eq!(results.len(), 3);
586 }
587
588 #[tokio::test]
589 async fn test_query_multiple_tags() {
590 let repo = Arc::new(MockRepository);
591 let query = MetricsQuery::new(repo.clone())
592 .with_tag("env", "prod")
593 .with_tag("version", "1.0");
594
595 let results = query.execute().await.unwrap();
596 assert_eq!(results.len(), 1);
597 assert_eq!(results[0].0.timestamp, 3000);
598 }
599
600 #[tokio::test]
601 async fn test_query_sort_order() {
602 let repo = Arc::new(MockRepository);
603
604 // Test descending (default)
605 let query = MetricsQuery::new(repo.clone());
606 let results = query.execute().await.unwrap();
607 assert_eq!(results[0].0.timestamp, 4000);
608 assert_eq!(results[3].0.timestamp, 1000);
609
610 // Test ascending
611 let query = MetricsQuery::new(repo.clone()).sort(SortOrder::Ascending);
612 let results = query.execute().await.unwrap();
613 assert_eq!(results[0].0.timestamp, 1000);
614 assert_eq!(results[3].0.timestamp, 4000);
615 }
616
617 #[tokio::test]
618 async fn test_query_pagination() {
619 let repo = Arc::new(MockRepository);
620 let query = MetricsQuery::new(repo.clone())
621 .sort(SortOrder::Ascending)
622 .offset(1)
623 .limit(2);
624
625 let results = query.execute().await.unwrap();
626 assert_eq!(results.len(), 2);
627 assert_eq!(results[0].0.timestamp, 2000);
628 assert_eq!(results[1].0.timestamp, 3000);
629 }
630
631 #[tokio::test]
632 async fn test_query_exists() {
633 let repo = Arc::new(MockRepository);
634
635 let exists = MetricsQuery::new(repo.clone())
636 .with_tag("env", "prod")
637 .exists()
638 .await
639 .unwrap();
640 assert!(exists);
641
642 let not_exists = MetricsQuery::new(repo.clone())
643 .with_tag("env", "nonexistent")
644 .exists()
645 .await
646 .unwrap();
647 assert!(!not_exists);
648 }
649}