term_guard/repository/
query.rs

1//! Query builder for filtering and retrieving metrics from repositories.
2
3use async_trait::async_trait;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::instrument;
7
8use crate::analyzers::context::AnalyzerContext;
9use crate::error::{Result, TermError};
10
11use super::{MetricsRepository, ResultKey};
12
13/// Builder for constructing queries against a metrics repository.
14///
15/// `MetricsQuery` provides a fluent API for filtering metrics by various criteria
16/// including time ranges, tags, and analyzer types. Queries are constructed
17/// incrementally and executed asynchronously.
18///
19/// # Example
20///
21/// ```rust,ignore
22/// use term_guard::repository::MetricsQuery;
23///
24/// let results = repository.load().await
25///     .after(start_timestamp)
26///     .before(end_timestamp)
27///     .with_tag("environment", "production")
28///     .for_analyzers(vec!["completeness", "size"])
29///     .execute()
30///     .await?;
31///
32/// for (key, context) in results {
33///     println!("Metrics at {}: {:?}", key.timestamp, context.all_metrics());
34/// }
35/// ```
36pub struct MetricsQuery {
37    /// The repository to query against.
38    repository: Arc<dyn MetricsRepository>,
39
40    /// Filter for metrics before this timestamp (exclusive).
41    before: Option<i64>,
42
43    /// Filter for metrics after this timestamp (inclusive).
44    after: Option<i64>,
45
46    /// Filter for metrics with matching tags.
47    tags: HashMap<String, String>,
48
49    /// Filter for specific analyzer types.
50    analyzers: Option<Vec<String>>,
51
52    /// Maximum number of results to return.
53    limit: Option<usize>,
54
55    /// Offset for pagination.
56    offset: Option<usize>,
57
58    /// Sort order for results.
59    sort_order: SortOrder,
60}
61
62/// Sort order for query results.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum SortOrder {
65    /// Sort by timestamp ascending (oldest first).
66    Ascending,
67    /// Sort by timestamp descending (newest first).
68    Descending,
69}
70
71impl MetricsQuery {
72    /// Creates a new query for the given repository.
73    ///
74    /// # Arguments
75    ///
76    /// * `repository` - The repository to query against
77    pub fn new(repository: Arc<dyn MetricsRepository>) -> Self {
78        Self {
79            repository,
80            before: None,
81            after: None,
82            tags: HashMap::new(),
83            analyzers: None,
84            limit: None,
85            offset: None,
86            sort_order: SortOrder::Descending,
87        }
88    }
89
90    /// Filters results to metrics before the specified timestamp.
91    ///
92    /// # Arguments
93    ///
94    /// * `timestamp` - Unix timestamp in milliseconds (exclusive)
95    ///
96    /// # Example
97    ///
98    /// ```rust,ignore
99    /// let end_time = chrono::Utc::now().timestamp_millis();
100    /// let query = repository.load().await.before(end_time);
101    /// ```
102    pub fn before(mut self, timestamp: i64) -> Self {
103        self.before = Some(timestamp);
104        self
105    }
106
107    /// Filters results to metrics after the specified timestamp.
108    ///
109    /// # Arguments
110    ///
111    /// * `timestamp` - Unix timestamp in milliseconds (inclusive)
112    ///
113    /// # Example
114    ///
115    /// ```rust,ignore
116    /// let start_time = (chrono::Utc::now() - chrono::Duration::days(7)).timestamp_millis();
117    /// let query = repository.load().await.after(start_time);
118    /// ```
119    pub fn after(mut self, timestamp: i64) -> Self {
120        self.after = Some(timestamp);
121        self
122    }
123
124    /// Filters results to metrics within a time range.
125    ///
126    /// # Arguments
127    ///
128    /// * `start` - Start timestamp in milliseconds (inclusive)
129    /// * `end` - End timestamp in milliseconds (exclusive)
130    ///
131    /// # Example
132    ///
133    /// ```rust,ignore
134    /// let query = repository.load().await.between(start_time, end_time);
135    /// ```
136    pub fn between(mut self, start: i64, end: i64) -> Self {
137        self.after = Some(start);
138        self.before = Some(end);
139        self
140    }
141
142    /// Filters results to metrics with a specific tag.
143    ///
144    /// # Arguments
145    ///
146    /// * `key` - The tag key
147    /// * `value` - The tag value
148    ///
149    /// # Example
150    ///
151    /// ```rust,ignore
152    /// let query = repository.load().await
153    ///     .with_tag("environment", "production")
154    ///     .with_tag("region", "us-west-2");
155    /// ```
156    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
157        self.tags.insert(key.into(), value.into());
158        self
159    }
160
161    /// Filters results to metrics with multiple tags.
162    ///
163    /// # Arguments
164    ///
165    /// * `tags` - Iterator of (key, value) pairs
166    ///
167    /// # Example
168    ///
169    /// ```rust,ignore
170    /// let tags = vec![
171    ///     ("environment", "production"),
172    ///     ("dataset", "users"),
173    /// ];
174    /// let query = repository.load().await.with_tags(tags);
175    /// ```
176    pub fn with_tags<I, K, V>(mut self, tags: I) -> Self
177    where
178        I: IntoIterator<Item = (K, V)>,
179        K: Into<String>,
180        V: Into<String>,
181    {
182        for (k, v) in tags {
183            self.tags.insert(k.into(), v.into());
184        }
185        self
186    }
187
188    /// Filters results to specific analyzer types.
189    ///
190    /// Only metrics from the specified analyzers will be included in the results.
191    ///
192    /// # Arguments
193    ///
194    /// * `analyzers` - List of analyzer names
195    ///
196    /// # Example
197    ///
198    /// ```rust,ignore
199    /// let query = repository.load().await
200    ///     .for_analyzers(vec!["completeness", "size", "mean"]);
201    /// ```
202    pub fn for_analyzers<I, S>(mut self, analyzers: I) -> Self
203    where
204        I: IntoIterator<Item = S>,
205        S: Into<String>,
206    {
207        self.analyzers = Some(analyzers.into_iter().map(|s| s.into()).collect());
208        self
209    }
210
211    /// Limits the number of results returned.
212    ///
213    /// # Arguments
214    ///
215    /// * `limit` - Maximum number of results
216    ///
217    /// # Example
218    ///
219    /// ```rust,ignore
220    /// let query = repository.load().await.limit(100);
221    /// ```
222    pub fn limit(mut self, limit: usize) -> Self {
223        self.limit = Some(limit);
224        self
225    }
226
227    /// Validates query parameters for correctness.
228    ///
229    /// # Returns
230    ///
231    /// Returns an error if the query parameters are invalid.
232    pub fn validate(&self) -> Result<()> {
233        // Validate time range
234        if let (Some(after), Some(before)) = (self.after, self.before) {
235            if after >= before {
236                return Err(TermError::invalid_repository_query(
237                    "Invalid time range: 'after' timestamp must be less than 'before' timestamp",
238                    format!("after: {after}, before: {before}"),
239                ));
240            }
241        }
242
243        // Validate limit
244        if let Some(limit) = self.limit {
245            if limit == 0 {
246                return Err(TermError::invalid_repository_query(
247                    "Limit must be greater than 0",
248                    format!("limit: {limit}"),
249                ));
250            }
251            if limit > 1_000_000 {
252                return Err(TermError::invalid_repository_query(
253                    "Limit too large (max: 1,000,000)",
254                    format!("limit: {limit}"),
255                ));
256            }
257        }
258
259        // Validate tag keys and values
260        for (key, value) in &self.tags {
261            if key.is_empty() {
262                return Err(TermError::invalid_repository_query(
263                    "Tag key cannot be empty",
264                    format!("tag: '{key}' = '{value}'"),
265                ));
266            }
267            if key.len() > 256 {
268                return Err(TermError::invalid_repository_query(
269                    "Tag key too long (max: 256 characters)",
270                    format!("tag: '{key}' ({} chars)", key.len()),
271                ));
272            }
273            if value.len() > 1024 {
274                return Err(TermError::invalid_repository_query(
275                    "Tag value too long (max: 1024 characters)",
276                    format!("tag: '{key}' = '{value}' ({} chars)", value.len()),
277                ));
278            }
279        }
280
281        // Validate analyzer names
282        if let Some(ref analyzers) = self.analyzers {
283            if analyzers.is_empty() {
284                return Err(TermError::invalid_repository_query(
285                    "Analyzer list cannot be empty (use None instead)",
286                    "analyzers: []".to_string(),
287                ));
288            }
289            for analyzer in analyzers {
290                if analyzer.is_empty() {
291                    return Err(TermError::invalid_repository_query(
292                        "Analyzer name cannot be empty",
293                        format!("analyzers: {analyzers:?}"),
294                    ));
295                }
296            }
297        }
298
299        Ok(())
300    }
301
302    /// Sets the offset for pagination.
303    ///
304    /// # Arguments
305    ///
306    /// * `offset` - Number of results to skip
307    ///
308    /// # Example
309    ///
310    /// ```rust,ignore
311    /// // Get results 100-200
312    /// let query = repository.load().await.offset(100).limit(100);
313    /// ```
314    pub fn offset(mut self, offset: usize) -> Self {
315        self.offset = Some(offset);
316        self
317    }
318
319    /// Sets the sort order for results.
320    ///
321    /// # Arguments
322    ///
323    /// * `order` - The sort order to use
324    ///
325    /// # Example
326    ///
327    /// ```rust,ignore
328    /// use term_guard::repository::query::SortOrder;
329    ///
330    /// let query = repository.load().await.sort(SortOrder::Ascending);
331    /// ```
332    pub fn sort(mut self, order: SortOrder) -> Self {
333        self.sort_order = order;
334        self
335    }
336
337    /// Executes the query and returns the results.
338    ///
339    /// # Returns
340    ///
341    /// A vector of (ResultKey, AnalyzerContext) pairs matching the query criteria,
342    /// sorted by timestamp according to the specified sort order.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the query execution fails (e.g., I/O error, invalid query).
347    ///
348    /// # Example
349    ///
350    /// ```rust,ignore
351    /// let results = repository.load().await
352    ///     .after(start_time)
353    ///     .with_tag("environment", "production")
354    ///     .execute()
355    ///     .await?;
356    ///
357    /// for (key, context) in results {
358    ///     println!("Timestamp: {}", key.timestamp);
359    ///     println!("Metrics: {:?}", context.all_metrics());
360    /// }
361    /// ```
362    #[instrument(skip(self), fields(
363        query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
364        query.filters.tag_count = self.tags.len(),
365        query.limit = self.limit,
366        query.offset = self.offset
367    ))]
368    pub async fn execute(self) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
369        // Validate query parameters before execution
370        self.validate()?;
371
372        // This is a default implementation that can be overridden by specific repositories
373        // for more efficient querying. For now, we'll load all keys and filter in memory.
374
375        let all_keys = self.repository.list_keys().await?;
376
377        let mut filtered_results = Vec::new();
378
379        for key in all_keys {
380            // Apply time filters
381            if let Some(before) = self.before {
382                if key.timestamp >= before {
383                    continue;
384                }
385            }
386
387            if let Some(after) = self.after {
388                if key.timestamp < after {
389                    continue;
390                }
391            }
392
393            // Apply tag filters
394            if !key.matches_tags(&self.tags) {
395                continue;
396            }
397
398            // Load the context for this key
399            // Try to get the actual context from the repository
400            let context = match self.repository.get(&key).await {
401                Ok(Some(ctx)) => ctx,
402                _ => AnalyzerContext::new(),
403            };
404
405            // Apply analyzer filter if specified
406            if let Some(ref analyzers) = self.analyzers {
407                // Check if any of the requested analyzers have metrics
408                let has_analyzer = analyzers
409                    .iter()
410                    .any(|analyzer| !context.get_analyzer_metrics(analyzer).is_empty());
411
412                if !has_analyzer && !context.all_metrics().is_empty() {
413                    continue;
414                }
415            }
416
417            filtered_results.push((key, context));
418        }
419
420        // Sort results
421        match self.sort_order {
422            SortOrder::Ascending => {
423                filtered_results.sort_by_key(|(key, _)| key.timestamp);
424            }
425            SortOrder::Descending => {
426                filtered_results.sort_by_key(|(key, _)| -key.timestamp);
427            }
428        }
429
430        // Apply pagination
431        if let Some(offset) = self.offset {
432            filtered_results = filtered_results.into_iter().skip(offset).collect();
433        }
434
435        if let Some(limit) = self.limit {
436            filtered_results.truncate(limit);
437        }
438
439        Ok(filtered_results)
440    }
441
442    /// Returns a count of metrics matching the query criteria without loading them.
443    ///
444    /// This is more efficient than executing the query and counting results
445    /// when only the count is needed.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the count operation fails.
450    #[instrument(skip(self), fields(
451        query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
452        query.filters.tag_count = self.tags.len()
453    ))]
454    pub async fn count(self) -> Result<usize> {
455        // Default implementation executes the query and counts results
456        // Specific repositories can override this for efficiency
457        let results = self.execute().await?;
458        Ok(results.len())
459    }
460
461    /// Checks if any metrics match the query criteria.
462    ///
463    /// # Returns
464    ///
465    /// Returns `true` if at least one metric matches, `false` otherwise.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the check operation fails.
470    #[instrument(skip(self), fields(
471        query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
472        query.filters.tag_count = self.tags.len()
473    ))]
474    pub async fn exists(self) -> Result<bool> {
475        let limited = self.limit(1);
476        let results = limited.execute().await?;
477        Ok(!results.is_empty())
478    }
479
480    /// Accessor methods for DataFusion integration
481    pub fn get_before(&self) -> Option<i64> {
482        self.before
483    }
484
485    pub fn get_after(&self) -> Option<i64> {
486        self.after
487    }
488
489    pub fn get_tags(&self) -> &HashMap<String, String> {
490        &self.tags
491    }
492
493    pub fn get_analyzers(&self) -> &Option<Vec<String>> {
494        &self.analyzers
495    }
496
497    pub fn get_limit(&self) -> Option<usize> {
498        self.limit
499    }
500
501    pub fn get_offset(&self) -> Option<usize> {
502        self.offset
503    }
504
505    pub fn get_sort_order(&self) -> SortOrder {
506        self.sort_order
507    }
508
509    pub fn is_ascending(&self) -> bool {
510        self.sort_order == SortOrder::Ascending
511    }
512}
513
514/// Extension trait for repositories to provide custom query execution.
515#[async_trait]
516pub trait QueryExecutor: MetricsRepository {
517    /// Executes a query with repository-specific optimizations.
518    ///
519    /// Repositories can implement this method to provide more efficient
520    /// query execution than the default in-memory filtering.
521    #[instrument(skip(self, query))]
522    async fn execute_query(
523        &self,
524        query: MetricsQuery,
525    ) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
526        // Default implementation delegates to the query's execute method
527        query.execute().await
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534
535    use super::MetricsRepository;
536
537    // Mock repository for testing
538    struct MockRepository;
539
540    #[async_trait]
541    impl MetricsRepository for MockRepository {
542        async fn save(&self, _key: ResultKey, _metrics: AnalyzerContext) -> Result<()> {
543            Ok(())
544        }
545
546        async fn load(&self) -> MetricsQuery {
547            MetricsQuery::new(Arc::new(MockRepository))
548        }
549
550        async fn delete(&self, _key: ResultKey) -> Result<()> {
551            Ok(())
552        }
553
554        async fn list_keys(&self) -> Result<Vec<ResultKey>> {
555            Ok(vec![
556                ResultKey::new(1000).with_tag("env", "prod"),
557                ResultKey::new(2000).with_tag("env", "staging"),
558                ResultKey::new(3000)
559                    .with_tag("env", "prod")
560                    .with_tag("version", "1.0"),
561                ResultKey::new(4000)
562                    .with_tag("env", "prod")
563                    .with_tag("version", "2.0"),
564            ])
565        }
566    }
567
568    #[tokio::test]
569    async fn test_query_time_filters() {
570        let repo = Arc::new(MockRepository);
571        let query = MetricsQuery::new(repo.clone()).after(1500).before(3500);
572
573        let results = query.execute().await.unwrap();
574        assert_eq!(results.len(), 2);
575        assert_eq!(results[0].0.timestamp, 3000);
576        assert_eq!(results[1].0.timestamp, 2000);
577    }
578
579    #[tokio::test]
580    async fn test_query_tag_filters() {
581        let repo = Arc::new(MockRepository);
582        let query = MetricsQuery::new(repo.clone()).with_tag("env", "prod");
583
584        let results = query.execute().await.unwrap();
585        assert_eq!(results.len(), 3);
586    }
587
588    #[tokio::test]
589    async fn test_query_multiple_tags() {
590        let repo = Arc::new(MockRepository);
591        let query = MetricsQuery::new(repo.clone())
592            .with_tag("env", "prod")
593            .with_tag("version", "1.0");
594
595        let results = query.execute().await.unwrap();
596        assert_eq!(results.len(), 1);
597        assert_eq!(results[0].0.timestamp, 3000);
598    }
599
600    #[tokio::test]
601    async fn test_query_sort_order() {
602        let repo = Arc::new(MockRepository);
603
604        // Test descending (default)
605        let query = MetricsQuery::new(repo.clone());
606        let results = query.execute().await.unwrap();
607        assert_eq!(results[0].0.timestamp, 4000);
608        assert_eq!(results[3].0.timestamp, 1000);
609
610        // Test ascending
611        let query = MetricsQuery::new(repo.clone()).sort(SortOrder::Ascending);
612        let results = query.execute().await.unwrap();
613        assert_eq!(results[0].0.timestamp, 1000);
614        assert_eq!(results[3].0.timestamp, 4000);
615    }
616
617    #[tokio::test]
618    async fn test_query_pagination() {
619        let repo = Arc::new(MockRepository);
620        let query = MetricsQuery::new(repo.clone())
621            .sort(SortOrder::Ascending)
622            .offset(1)
623            .limit(2);
624
625        let results = query.execute().await.unwrap();
626        assert_eq!(results.len(), 2);
627        assert_eq!(results[0].0.timestamp, 2000);
628        assert_eq!(results[1].0.timestamp, 3000);
629    }
630
631    #[tokio::test]
632    async fn test_query_exists() {
633        let repo = Arc::new(MockRepository);
634
635        let exists = MetricsQuery::new(repo.clone())
636            .with_tag("env", "prod")
637            .exists()
638            .await
639            .unwrap();
640        assert!(exists);
641
642        let not_exists = MetricsQuery::new(repo.clone())
643            .with_tag("env", "nonexistent")
644            .exists()
645            .await
646            .unwrap();
647        assert!(!not_exists);
648    }
649}