term_guard/repository/
in_memory.rs

1//! In-memory implementation of MetricsRepository for testing and development.
2
3use async_trait::async_trait;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use tracing::instrument;
8
9use crate::analyzers::context::AnalyzerContext;
10use crate::error::{Result, TermError};
11
12use super::{MetricsQuery, MetricsRepository, RepositoryMetadata, ResultKey};
13
14/// In-memory implementation of the MetricsRepository trait.
15///
16/// This implementation stores all metrics in memory and is useful for:
17/// - Testing and development
18/// - Small-scale applications
19/// - Caching layers
20///
21/// # Example
22///
23/// ```rust,ignore
24/// use term_guard::repository::{InMemoryRepository, ResultKey};
25/// use term_guard::analyzers::AnalyzerContext;
26///
27/// let repository = InMemoryRepository::new();
28///
29/// // Save metrics
30/// let key = ResultKey::now().with_tag("env", "test");
31/// let context = AnalyzerContext::new();
32/// repository.save(key, context).await?;
33///
34/// // Query metrics
35/// let results = repository.load().await
36///     .with_tag("env", "test")
37///     .execute()
38///     .await?;
39/// ```
40#[derive(Clone)]
41pub struct InMemoryRepository {
42    /// Storage for metrics, keyed by ResultKey.
43    storage: Arc<RwLock<HashMap<ResultKey, AnalyzerContext>>>,
44
45    /// Repository metadata.
46    metadata: Arc<RwLock<RepositoryMetadata>>,
47}
48
49impl InMemoryRepository {
50    /// Creates a new empty in-memory repository.
51    pub fn new() -> Self {
52        let mut metadata = RepositoryMetadata::new("in_memory");
53        metadata.total_metrics = Some(0);
54        Self {
55            storage: Arc::new(RwLock::new(HashMap::new())),
56            metadata: Arc::new(RwLock::new(metadata)),
57        }
58    }
59
60    /// Creates a new repository with pre-populated data.
61    ///
62    /// # Arguments
63    ///
64    /// * `data` - Initial metrics to populate the repository with
65    pub fn with_data(data: HashMap<ResultKey, AnalyzerContext>) -> Self {
66        let repo = Self::new();
67        let storage = repo.storage.clone();
68
69        tokio::spawn(async move {
70            let mut store = storage.write().await;
71            store.extend(data);
72        });
73
74        repo
75    }
76
77    /// Returns the number of stored metrics.
78    pub async fn size(&self) -> usize {
79        self.storage.read().await.len()
80    }
81
82    /// Clears all stored metrics.
83    pub async fn clear(&mut self) {
84        self.storage.write().await.clear();
85        self.update_metadata().await;
86    }
87
88    /// Updates repository metadata after changes.
89    async fn update_metadata(&self) {
90        let store = self.storage.read().await;
91        let mut metadata = self.metadata.write().await;
92
93        metadata.total_metrics = Some(store.len());
94        metadata.last_modified = Some(chrono::Utc::now());
95
96        // Calculate approximate storage size
97        let size_bytes: usize = store
98            .iter()
99            .map(|(k, v)| {
100                // Rough estimation of memory usage
101                std::mem::size_of_val(k)
102                    + std::mem::size_of_val(v)
103                    + k.tags
104                        .iter()
105                        .map(|(key, val)| key.len() + val.len())
106                        .sum::<usize>()
107            })
108            .sum();
109
110        metadata.storage_size_bytes = Some(size_bytes as u64);
111    }
112}
113
114impl Default for InMemoryRepository {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120#[async_trait]
121impl MetricsRepository for InMemoryRepository {
122    #[instrument(skip(self, metrics), fields(key.timestamp = %key.timestamp, repository_type = "in_memory"))]
123    async fn save(&self, key: ResultKey, metrics: AnalyzerContext) -> Result<()> {
124        // Validate the key before saving
125        if let Err(validation_error) = key.validate_tags() {
126            return Err(TermError::repository_validation(
127                "tags",
128                validation_error,
129                key.to_string(),
130            ));
131        }
132
133        // Check for potential key collisions using normalized keys
134        let normalized_key = key.to_normalized_storage_key();
135        let store = self.storage.read().await;
136
137        // Check if a different key with the same normalized representation exists
138        for existing_key in store.keys() {
139            if existing_key != &key && existing_key.to_normalized_storage_key() == normalized_key {
140                return Err(TermError::repository_key_collision(
141                    key.to_string(),
142                    format!("Key collision detected with existing key: {existing_key}"),
143                ));
144            }
145        }
146
147        drop(store);
148
149        let mut store = self.storage.write().await;
150        store.insert(key, metrics);
151        drop(store);
152
153        self.update_metadata().await;
154        Ok(())
155    }
156
157    #[instrument(skip(self))]
158    async fn load(&self) -> MetricsQuery {
159        // Create a special query that has access to the storage
160        MetricsQuery::new(Arc::new(self.clone()))
161    }
162
163    #[instrument(skip(self), fields(key.timestamp = %key.timestamp, repository_type = "in_memory"))]
164    async fn delete(&self, key: ResultKey) -> Result<()> {
165        let mut store = self.storage.write().await;
166
167        if store.remove(&key).is_none() {
168            return Err(TermError::repository(
169                "in_memory",
170                "delete",
171                format!("Key not found: {key}"),
172            ));
173        }
174
175        drop(store);
176        self.update_metadata().await;
177        Ok(())
178    }
179
180    #[instrument(skip(self))]
181    async fn list_keys(&self) -> Result<Vec<ResultKey>> {
182        let store = self.storage.read().await;
183        Ok(store.keys().cloned().collect())
184    }
185
186    #[instrument(skip(self), fields(key.timestamp = %key.timestamp, repository_type = "in_memory"))]
187    async fn get(&self, key: &ResultKey) -> Result<Option<AnalyzerContext>> {
188        let store = self.storage.read().await;
189        Ok(store.get(key).cloned())
190    }
191
192    #[instrument(skip(self), fields(key.timestamp = %key.timestamp, repository_type = "in_memory"))]
193    async fn exists(&self, key: &ResultKey) -> Result<bool> {
194        let store = self.storage.read().await;
195        Ok(store.contains_key(key))
196    }
197
198    #[instrument(skip(self))]
199    async fn metadata(&self) -> Result<RepositoryMetadata> {
200        Ok(self.metadata.read().await.clone())
201    }
202}
203
204/// Override the load method to return actual stored contexts
205impl InMemoryRepository {
206    /// Loads a query that will return actual stored contexts.
207    pub async fn load_with_data(&self) -> InMemoryMetricsQuery {
208        InMemoryMetricsQuery::new(self.clone())
209    }
210
211    /// Determines if the dataset is large enough to benefit from DataFusion.
212    ///
213    /// DataFusion has overhead for small datasets, so we only use it when
214    /// the performance benefits outweigh the setup costs.
215    pub async fn should_use_datafusion(&self) -> bool {
216        const DATAFUSION_THRESHOLD: usize = 1000; // Threshold based on benchmarks
217        self.size().await >= DATAFUSION_THRESHOLD
218    }
219}
220
221/// A custom query implementation for in-memory repository that returns actual data.
222pub struct InMemoryMetricsQuery {
223    repository: InMemoryRepository,
224    before: Option<i64>,
225    after: Option<i64>,
226    tags: HashMap<String, String>,
227    analyzers: Option<Vec<String>>,
228    limit: Option<usize>,
229    offset: Option<usize>,
230    sort_order: super::query::SortOrder,
231}
232
233impl InMemoryMetricsQuery {
234    pub fn new(repository: InMemoryRepository) -> Self {
235        Self {
236            repository,
237            before: None,
238            after: None,
239            tags: HashMap::new(),
240            analyzers: None,
241            limit: None,
242            offset: None,
243            sort_order: super::query::SortOrder::Descending,
244        }
245    }
246
247    pub fn before(mut self, timestamp: i64) -> Self {
248        self.before = Some(timestamp);
249        self
250    }
251
252    pub fn after(mut self, timestamp: i64) -> Self {
253        self.after = Some(timestamp);
254        self
255    }
256
257    pub fn between(mut self, start: i64, end: i64) -> Self {
258        self.after = Some(start);
259        self.before = Some(end);
260        self
261    }
262
263    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
264        self.tags.insert(key.into(), value.into());
265        self
266    }
267
268    pub fn with_tags<I, K, V>(mut self, tags: I) -> Self
269    where
270        I: IntoIterator<Item = (K, V)>,
271        K: Into<String>,
272        V: Into<String>,
273    {
274        for (k, v) in tags {
275            self.tags.insert(k.into(), v.into());
276        }
277        self
278    }
279
280    pub fn for_analyzers<I, S>(mut self, analyzers: I) -> Self
281    where
282        I: IntoIterator<Item = S>,
283        S: Into<String>,
284    {
285        self.analyzers = Some(analyzers.into_iter().map(|s| s.into()).collect());
286        self
287    }
288
289    pub fn limit(mut self, limit: usize) -> Self {
290        self.limit = Some(limit);
291        self
292    }
293
294    pub fn offset(mut self, offset: usize) -> Self {
295        self.offset = Some(offset);
296        self
297    }
298
299    pub fn sort(mut self, order: super::query::SortOrder) -> Self {
300        self.sort_order = order;
301        self
302    }
303
304    #[instrument(skip(self), fields(
305        query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
306        query.filters.tag_count = self.tags.len(),
307        query.limit = self.limit,
308        query.offset = self.offset
309    ))]
310    pub async fn execute(self) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
311        self.repository
312            .execute_query_optimized(
313                self.before,
314                self.after,
315                &self.tags,
316                &self.analyzers,
317                self.limit,
318                self.offset,
319                self.sort_order == super::query::SortOrder::Ascending,
320            )
321            .await
322    }
323
324    #[instrument(skip(self))]
325    pub async fn count(self) -> Result<usize> {
326        let results = self.execute().await?;
327        Ok(results.len())
328    }
329
330    #[instrument(skip(self))]
331    pub async fn exists(self) -> Result<bool> {
332        let limited = self.limit(1);
333        let results = limited.execute().await?;
334        Ok(!results.is_empty())
335    }
336}
337
338/// Custom query executor for in-memory repository with optimized filtering.
339impl InMemoryRepository {
340    /// Executes a query with in-memory optimizations.
341    #[allow(clippy::too_many_arguments)]
342    #[instrument(skip(self, tags, analyzers), fields(
343        repository_type = "in_memory",
344        time_range.before = before,
345        time_range.after = after,
346        limit = limit,
347        offset = offset,
348        ascending = ascending
349    ))]
350    pub async fn execute_query_optimized(
351        &self,
352        before: Option<i64>,
353        after: Option<i64>,
354        tags: &HashMap<String, String>,
355        analyzers: &Option<Vec<String>>,
356        limit: Option<usize>,
357        offset: Option<usize>,
358        ascending: bool,
359    ) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
360        let store = self.storage.read().await;
361
362        // Filter and collect results
363        let mut results: Vec<(ResultKey, AnalyzerContext)> = store
364            .iter()
365            .filter(|(key, _)| {
366                // Time range filter
367                if let Some(before) = before {
368                    if key.timestamp >= before {
369                        return false;
370                    }
371                }
372                if let Some(after) = after {
373                    if key.timestamp < after {
374                        return false;
375                    }
376                }
377
378                // Tag filter
379                if !key.matches_tags(tags) {
380                    return false;
381                }
382
383                true
384            })
385            .filter(|(_, context)| {
386                // Analyzer filter
387                if let Some(ref analyzers) = analyzers {
388                    analyzers
389                        .iter()
390                        .any(|analyzer| !context.get_analyzer_metrics(analyzer).is_empty())
391                } else {
392                    true
393                }
394            })
395            .map(|(k, v)| (k.clone(), v.clone()))
396            .collect();
397
398        // Sort results
399        if ascending {
400            results.sort_by_key(|(key, _)| key.timestamp);
401        } else {
402            results.sort_by_key(|(key, _)| -key.timestamp);
403        }
404
405        // Apply pagination
406        let start = offset.unwrap_or(0);
407        let end = if let Some(limit) = limit {
408            (start + limit).min(results.len())
409        } else {
410            results.len()
411        };
412
413        Ok(results[start..end].to_vec())
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use crate::analyzers::types::MetricValue;
421    use crate::repository::MetricsRepository;
422
423    #[tokio::test]
424    async fn test_in_memory_repository_basic_operations() {
425        let repo = InMemoryRepository::new();
426
427        // Test save
428        let key1 = ResultKey::new(1000).with_tag("env", "test");
429        let mut context1 = AnalyzerContext::new();
430        context1.store_metric("size", MetricValue::Long(100));
431
432        repo.save(key1.clone(), context1.clone()).await.unwrap();
433
434        // Test exists
435        assert!(repo.exists(&key1).await.unwrap());
436
437        // Test list_keys
438        let keys = repo.list_keys().await.unwrap();
439        assert_eq!(keys.len(), 1);
440        assert_eq!(keys[0], key1);
441
442        // Test size
443        assert_eq!(repo.size().await, 1);
444
445        // Test delete
446        repo.delete(key1.clone()).await.unwrap();
447        assert!(!repo.exists(&key1).await.unwrap());
448        assert_eq!(repo.size().await, 0);
449    }
450
451    #[tokio::test]
452    async fn test_in_memory_repository_metadata() {
453        let repo = InMemoryRepository::new();
454
455        let metadata = repo.metadata().await.unwrap();
456        assert_eq!(metadata.backend_type, Some("in_memory".to_string()));
457        assert_eq!(metadata.total_metrics, Some(0));
458
459        // Add some data
460        let key = ResultKey::now().with_tag("test", "value");
461        let context = AnalyzerContext::new();
462        repo.save(key, context).await.unwrap();
463
464        let metadata = repo.metadata().await.unwrap();
465        assert_eq!(metadata.total_metrics, Some(1));
466        assert!(metadata.last_modified.is_some());
467        assert!(metadata.storage_size_bytes.is_some());
468    }
469
470    #[tokio::test]
471    async fn test_in_memory_repository_query() {
472        let repo = InMemoryRepository::new();
473
474        // Add test data
475        for i in 0..5 {
476            let key = ResultKey::new(i * 1000)
477                .with_tag("env", if i % 2 == 0 { "prod" } else { "staging" })
478                .with_tag("version", format!("v{i}"));
479
480            let mut context = AnalyzerContext::new();
481            context.store_metric("size", MetricValue::Long(i * 100));
482
483            repo.save(key, context).await.unwrap();
484        }
485
486        // Test query with tag filter
487        let results = repo
488            .load()
489            .await
490            .with_tag("env", "prod")
491            .execute()
492            .await
493            .unwrap();
494
495        assert_eq!(results.len(), 3); // 0, 2, 4 are prod
496
497        // Test query with time range
498        let results = repo
499            .load()
500            .await
501            .after(1000)
502            .before(4000)
503            .execute()
504            .await
505            .unwrap();
506
507        assert_eq!(results.len(), 3); // 1000, 2000, 3000
508
509        // Test query with pagination
510        let results = repo
511            .load()
512            .await
513            .limit(2)
514            .offset(1)
515            .execute()
516            .await
517            .unwrap();
518
519        assert_eq!(results.len(), 2);
520    }
521
522    #[tokio::test]
523    async fn test_in_memory_repository_clear() {
524        let mut repo = InMemoryRepository::new();
525
526        // Add data
527        for i in 0..3 {
528            let key = ResultKey::new(i * 1000);
529            let context = AnalyzerContext::new();
530            repo.save(key, context).await.unwrap();
531        }
532
533        assert_eq!(repo.size().await, 3);
534
535        // Clear repository
536        repo.clear().await;
537        assert_eq!(repo.size().await, 0);
538
539        let keys = repo.list_keys().await.unwrap();
540        assert!(keys.is_empty());
541    }
542}