1use async_trait::async_trait;
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use tracing::instrument;
8
9use crate::analyzers::context::AnalyzerContext;
10use crate::error::{Result, TermError};
11
12use super::{MetricsQuery, MetricsRepository, RepositoryMetadata, ResultKey};
13
14#[derive(Clone)]
41pub struct InMemoryRepository {
42 storage: Arc<RwLock<HashMap<ResultKey, AnalyzerContext>>>,
44
45 metadata: Arc<RwLock<RepositoryMetadata>>,
47}
48
49impl InMemoryRepository {
50 pub fn new() -> Self {
52 let mut metadata = RepositoryMetadata::new("in_memory");
53 metadata.total_metrics = Some(0);
54 Self {
55 storage: Arc::new(RwLock::new(HashMap::new())),
56 metadata: Arc::new(RwLock::new(metadata)),
57 }
58 }
59
60 pub fn with_data(data: HashMap<ResultKey, AnalyzerContext>) -> Self {
66 let repo = Self::new();
67 let storage = repo.storage.clone();
68
69 tokio::spawn(async move {
70 let mut store = storage.write().await;
71 store.extend(data);
72 });
73
74 repo
75 }
76
77 pub async fn size(&self) -> usize {
79 self.storage.read().await.len()
80 }
81
82 pub async fn clear(&mut self) {
84 self.storage.write().await.clear();
85 self.update_metadata().await;
86 }
87
88 async fn update_metadata(&self) {
90 let store = self.storage.read().await;
91 let mut metadata = self.metadata.write().await;
92
93 metadata.total_metrics = Some(store.len());
94 metadata.last_modified = Some(chrono::Utc::now());
95
96 let size_bytes: usize = store
98 .iter()
99 .map(|(k, v)| {
100 std::mem::size_of_val(k)
102 + std::mem::size_of_val(v)
103 + k.tags
104 .iter()
105 .map(|(key, val)| key.len() + val.len())
106 .sum::<usize>()
107 })
108 .sum();
109
110 metadata.storage_size_bytes = Some(size_bytes as u64);
111 }
112}
113
114impl Default for InMemoryRepository {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120#[async_trait]
121impl MetricsRepository for InMemoryRepository {
122 #[instrument(skip(self, metrics), fields(key.timestamp = %key.timestamp, repository_type = "in_memory"))]
123 async fn save(&self, key: ResultKey, metrics: AnalyzerContext) -> Result<()> {
124 if let Err(validation_error) = key.validate_tags() {
126 return Err(TermError::repository_validation(
127 "tags",
128 validation_error,
129 key.to_string(),
130 ));
131 }
132
133 let normalized_key = key.to_normalized_storage_key();
135 let store = self.storage.read().await;
136
137 for existing_key in store.keys() {
139 if existing_key != &key && existing_key.to_normalized_storage_key() == normalized_key {
140 return Err(TermError::repository_key_collision(
141 key.to_string(),
142 format!("Key collision detected with existing key: {existing_key}"),
143 ));
144 }
145 }
146
147 drop(store);
148
149 let mut store = self.storage.write().await;
150 store.insert(key, metrics);
151 drop(store);
152
153 self.update_metadata().await;
154 Ok(())
155 }
156
157 #[instrument(skip(self))]
158 async fn load(&self) -> MetricsQuery {
159 MetricsQuery::new(Arc::new(self.clone()))
161 }
162
163 #[instrument(skip(self), fields(key.timestamp = %key.timestamp, repository_type = "in_memory"))]
164 async fn delete(&self, key: ResultKey) -> Result<()> {
165 let mut store = self.storage.write().await;
166
167 if store.remove(&key).is_none() {
168 return Err(TermError::repository(
169 "in_memory",
170 "delete",
171 format!("Key not found: {key}"),
172 ));
173 }
174
175 drop(store);
176 self.update_metadata().await;
177 Ok(())
178 }
179
180 #[instrument(skip(self))]
181 async fn list_keys(&self) -> Result<Vec<ResultKey>> {
182 let store = self.storage.read().await;
183 Ok(store.keys().cloned().collect())
184 }
185
186 #[instrument(skip(self), fields(key.timestamp = %key.timestamp, repository_type = "in_memory"))]
187 async fn get(&self, key: &ResultKey) -> Result<Option<AnalyzerContext>> {
188 let store = self.storage.read().await;
189 Ok(store.get(key).cloned())
190 }
191
192 #[instrument(skip(self), fields(key.timestamp = %key.timestamp, repository_type = "in_memory"))]
193 async fn exists(&self, key: &ResultKey) -> Result<bool> {
194 let store = self.storage.read().await;
195 Ok(store.contains_key(key))
196 }
197
198 #[instrument(skip(self))]
199 async fn metadata(&self) -> Result<RepositoryMetadata> {
200 Ok(self.metadata.read().await.clone())
201 }
202}
203
204impl InMemoryRepository {
206 pub async fn load_with_data(&self) -> InMemoryMetricsQuery {
208 InMemoryMetricsQuery::new(self.clone())
209 }
210
211 pub async fn should_use_datafusion(&self) -> bool {
216 const DATAFUSION_THRESHOLD: usize = 1000; self.size().await >= DATAFUSION_THRESHOLD
218 }
219}
220
221pub struct InMemoryMetricsQuery {
223 repository: InMemoryRepository,
224 before: Option<i64>,
225 after: Option<i64>,
226 tags: HashMap<String, String>,
227 analyzers: Option<Vec<String>>,
228 limit: Option<usize>,
229 offset: Option<usize>,
230 sort_order: super::query::SortOrder,
231}
232
233impl InMemoryMetricsQuery {
234 pub fn new(repository: InMemoryRepository) -> Self {
235 Self {
236 repository,
237 before: None,
238 after: None,
239 tags: HashMap::new(),
240 analyzers: None,
241 limit: None,
242 offset: None,
243 sort_order: super::query::SortOrder::Descending,
244 }
245 }
246
247 pub fn before(mut self, timestamp: i64) -> Self {
248 self.before = Some(timestamp);
249 self
250 }
251
252 pub fn after(mut self, timestamp: i64) -> Self {
253 self.after = Some(timestamp);
254 self
255 }
256
257 pub fn between(mut self, start: i64, end: i64) -> Self {
258 self.after = Some(start);
259 self.before = Some(end);
260 self
261 }
262
263 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
264 self.tags.insert(key.into(), value.into());
265 self
266 }
267
268 pub fn with_tags<I, K, V>(mut self, tags: I) -> Self
269 where
270 I: IntoIterator<Item = (K, V)>,
271 K: Into<String>,
272 V: Into<String>,
273 {
274 for (k, v) in tags {
275 self.tags.insert(k.into(), v.into());
276 }
277 self
278 }
279
280 pub fn for_analyzers<I, S>(mut self, analyzers: I) -> Self
281 where
282 I: IntoIterator<Item = S>,
283 S: Into<String>,
284 {
285 self.analyzers = Some(analyzers.into_iter().map(|s| s.into()).collect());
286 self
287 }
288
289 pub fn limit(mut self, limit: usize) -> Self {
290 self.limit = Some(limit);
291 self
292 }
293
294 pub fn offset(mut self, offset: usize) -> Self {
295 self.offset = Some(offset);
296 self
297 }
298
299 pub fn sort(mut self, order: super::query::SortOrder) -> Self {
300 self.sort_order = order;
301 self
302 }
303
304 #[instrument(skip(self), fields(
305 query.filters.time_range = format_args!("{:?}-{:?}", self.after, self.before),
306 query.filters.tag_count = self.tags.len(),
307 query.limit = self.limit,
308 query.offset = self.offset
309 ))]
310 pub async fn execute(self) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
311 self.repository
312 .execute_query_optimized(
313 self.before,
314 self.after,
315 &self.tags,
316 &self.analyzers,
317 self.limit,
318 self.offset,
319 self.sort_order == super::query::SortOrder::Ascending,
320 )
321 .await
322 }
323
324 #[instrument(skip(self))]
325 pub async fn count(self) -> Result<usize> {
326 let results = self.execute().await?;
327 Ok(results.len())
328 }
329
330 #[instrument(skip(self))]
331 pub async fn exists(self) -> Result<bool> {
332 let limited = self.limit(1);
333 let results = limited.execute().await?;
334 Ok(!results.is_empty())
335 }
336}
337
338impl InMemoryRepository {
340 #[allow(clippy::too_many_arguments)]
342 #[instrument(skip(self, tags, analyzers), fields(
343 repository_type = "in_memory",
344 time_range.before = before,
345 time_range.after = after,
346 limit = limit,
347 offset = offset,
348 ascending = ascending
349 ))]
350 pub async fn execute_query_optimized(
351 &self,
352 before: Option<i64>,
353 after: Option<i64>,
354 tags: &HashMap<String, String>,
355 analyzers: &Option<Vec<String>>,
356 limit: Option<usize>,
357 offset: Option<usize>,
358 ascending: bool,
359 ) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
360 let store = self.storage.read().await;
361
362 let mut results: Vec<(ResultKey, AnalyzerContext)> = store
364 .iter()
365 .filter(|(key, _)| {
366 if let Some(before) = before {
368 if key.timestamp >= before {
369 return false;
370 }
371 }
372 if let Some(after) = after {
373 if key.timestamp < after {
374 return false;
375 }
376 }
377
378 if !key.matches_tags(tags) {
380 return false;
381 }
382
383 true
384 })
385 .filter(|(_, context)| {
386 if let Some(ref analyzers) = analyzers {
388 analyzers
389 .iter()
390 .any(|analyzer| !context.get_analyzer_metrics(analyzer).is_empty())
391 } else {
392 true
393 }
394 })
395 .map(|(k, v)| (k.clone(), v.clone()))
396 .collect();
397
398 if ascending {
400 results.sort_by_key(|(key, _)| key.timestamp);
401 } else {
402 results.sort_by_key(|(key, _)| -key.timestamp);
403 }
404
405 let start = offset.unwrap_or(0);
407 let end = if let Some(limit) = limit {
408 (start + limit).min(results.len())
409 } else {
410 results.len()
411 };
412
413 Ok(results[start..end].to_vec())
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use crate::analyzers::types::MetricValue;
421 use crate::repository::MetricsRepository;
422
423 #[tokio::test]
424 async fn test_in_memory_repository_basic_operations() {
425 let repo = InMemoryRepository::new();
426
427 let key1 = ResultKey::new(1000).with_tag("env", "test");
429 let mut context1 = AnalyzerContext::new();
430 context1.store_metric("size", MetricValue::Long(100));
431
432 repo.save(key1.clone(), context1.clone()).await.unwrap();
433
434 assert!(repo.exists(&key1).await.unwrap());
436
437 let keys = repo.list_keys().await.unwrap();
439 assert_eq!(keys.len(), 1);
440 assert_eq!(keys[0], key1);
441
442 assert_eq!(repo.size().await, 1);
444
445 repo.delete(key1.clone()).await.unwrap();
447 assert!(!repo.exists(&key1).await.unwrap());
448 assert_eq!(repo.size().await, 0);
449 }
450
451 #[tokio::test]
452 async fn test_in_memory_repository_metadata() {
453 let repo = InMemoryRepository::new();
454
455 let metadata = repo.metadata().await.unwrap();
456 assert_eq!(metadata.backend_type, Some("in_memory".to_string()));
457 assert_eq!(metadata.total_metrics, Some(0));
458
459 let key = ResultKey::now().with_tag("test", "value");
461 let context = AnalyzerContext::new();
462 repo.save(key, context).await.unwrap();
463
464 let metadata = repo.metadata().await.unwrap();
465 assert_eq!(metadata.total_metrics, Some(1));
466 assert!(metadata.last_modified.is_some());
467 assert!(metadata.storage_size_bytes.is_some());
468 }
469
470 #[tokio::test]
471 async fn test_in_memory_repository_query() {
472 let repo = InMemoryRepository::new();
473
474 for i in 0..5 {
476 let key = ResultKey::new(i * 1000)
477 .with_tag("env", if i % 2 == 0 { "prod" } else { "staging" })
478 .with_tag("version", format!("v{i}"));
479
480 let mut context = AnalyzerContext::new();
481 context.store_metric("size", MetricValue::Long(i * 100));
482
483 repo.save(key, context).await.unwrap();
484 }
485
486 let results = repo
488 .load()
489 .await
490 .with_tag("env", "prod")
491 .execute()
492 .await
493 .unwrap();
494
495 assert_eq!(results.len(), 3); let results = repo
499 .load()
500 .await
501 .after(1000)
502 .before(4000)
503 .execute()
504 .await
505 .unwrap();
506
507 assert_eq!(results.len(), 3); let results = repo
511 .load()
512 .await
513 .limit(2)
514 .offset(1)
515 .execute()
516 .await
517 .unwrap();
518
519 assert_eq!(results.len(), 2);
520 }
521
522 #[tokio::test]
523 async fn test_in_memory_repository_clear() {
524 let mut repo = InMemoryRepository::new();
525
526 for i in 0..3 {
528 let key = ResultKey::new(i * 1000);
529 let context = AnalyzerContext::new();
530 repo.save(key, context).await.unwrap();
531 }
532
533 assert_eq!(repo.size().await, 3);
534
535 repo.clear().await;
537 assert_eq!(repo.size().await, 0);
538
539 let keys = repo.list_keys().await.unwrap();
540 assert!(keys.is_empty());
541 }
542}