1use crate::models::{Area, Project, Task};
4use anyhow::Result;
5use chrono::{DateTime, Utc};
6use moka::future::Cache;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10use std::time::Duration;
11use tracing::{debug, warn};
12use uuid::Uuid;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct QueryCacheConfig {
17 pub max_queries: u64,
19 pub ttl: Duration,
21 pub tti: Duration,
23 pub enable_compression: bool,
25 pub max_result_size: usize,
27}
28
29impl Default for QueryCacheConfig {
30 fn default() -> Self {
31 Self {
32 max_queries: 1000,
33 ttl: Duration::from_secs(1800), tti: Duration::from_secs(300), enable_compression: true,
36 max_result_size: 1024 * 1024, }
38 }
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct CachedQueryResult<T> {
44 pub data: T,
46 pub executed_at: DateTime<Utc>,
48 pub expires_at: DateTime<Utc>,
50 pub execution_time_ms: u64,
52 pub params_hash: String,
54 pub dependencies: Vec<QueryDependency>,
56 pub result_size: usize,
58 pub compressed: bool,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
64pub struct QueryDependency {
65 pub table: String,
67 pub entity_id: Option<Uuid>,
69 pub invalidating_operations: Vec<String>,
71}
72
73#[derive(Debug, Clone, Default, Serialize, Deserialize)]
75pub struct QueryCacheStats {
76 pub total_queries: u64,
77 pub hits: u64,
78 pub misses: u64,
79 pub hit_rate: f64,
80 pub total_size_bytes: u64,
81 pub average_execution_time_ms: f64,
82 pub compressed_queries: u64,
83 pub uncompressed_queries: u64,
84}
85
86impl QueryCacheStats {
87 pub fn calculate_hit_rate(&mut self) {
88 let total = self.hits + self.misses;
89 self.hit_rate = if total > 0 {
90 #[allow(clippy::cast_precision_loss)]
91 {
92 self.hits as f64 / total as f64
93 }
94 } else {
95 0.0
96 };
97 }
98}
99
100pub struct QueryCache {
102 tasks_cache: Cache<String, CachedQueryResult<Vec<Task>>>,
104 projects_cache: Cache<String, CachedQueryResult<Vec<Project>>>,
106 areas_cache: Cache<String, CachedQueryResult<Vec<Area>>>,
108 search_cache: Cache<String, CachedQueryResult<Vec<Task>>>,
110 stats: Arc<RwLock<QueryCacheStats>>,
112 config: QueryCacheConfig,
114}
115
116impl QueryCache {
117 #[must_use]
119 pub fn new(config: QueryCacheConfig) -> Self {
120 let tasks_cache = Cache::builder()
121 .max_capacity(config.max_queries)
122 .time_to_live(config.ttl)
123 .time_to_idle(config.tti)
124 .build();
125
126 let projects_cache = Cache::builder()
127 .max_capacity(config.max_queries)
128 .time_to_live(config.ttl)
129 .time_to_idle(config.tti)
130 .build();
131
132 let areas_cache = Cache::builder()
133 .max_capacity(config.max_queries)
134 .time_to_live(config.ttl)
135 .time_to_idle(config.tti)
136 .build();
137
138 let search_cache = Cache::builder()
139 .max_capacity(config.max_queries)
140 .time_to_live(config.ttl)
141 .time_to_idle(config.tti)
142 .build();
143
144 Self {
145 tasks_cache,
146 projects_cache,
147 areas_cache,
148 search_cache,
149 stats: Arc::new(RwLock::new(QueryCacheStats::default())),
150 config,
151 }
152 }
153
154 #[must_use]
156 pub fn new_default() -> Self {
157 Self::new(QueryCacheConfig::default())
158 }
159
160 pub async fn cache_tasks_query<F, Fut>(
169 &self,
170 query_key: &str,
171 params_hash: &str,
172 fetcher: F,
173 ) -> Result<Vec<Task>>
174 where
175 F: FnOnce() -> Fut,
176 Fut: std::future::Future<Output = Result<Vec<Task>>>,
177 {
178 if let Some(cached) = self.tasks_cache.get(query_key).await {
180 if !cached.is_expired() && cached.params_hash == params_hash {
181 self.record_hit();
182 debug!("Query cache hit for tasks: {}", query_key);
183 return Ok(cached.data);
184 }
185 }
186
187 let start_time = std::time::Instant::now();
189 let data = fetcher().await?;
190 #[allow(clippy::cast_possible_truncation)]
191 let execution_time = start_time.elapsed().as_millis() as u64;
192
193 let result_size = Self::calculate_result_size(&data);
195 if result_size > self.config.max_result_size {
196 warn!("Query result too large to cache: {} bytes", result_size);
197 self.record_miss();
198 return Ok(data);
199 }
200
201 let dependencies = Self::create_task_dependencies(&data);
203
204 let cached_result = CachedQueryResult {
206 data: data.clone(),
207 executed_at: Utc::now(),
208 expires_at: Utc::now()
209 + chrono::Duration::from_std(self.config.ttl).unwrap_or_default(),
210 execution_time_ms: execution_time,
211 params_hash: params_hash.to_string(),
212 dependencies,
213 result_size,
214 compressed: self.config.enable_compression,
215 };
216
217 self.tasks_cache
219 .insert(query_key.to_string(), cached_result)
220 .await;
221
222 self.update_stats(result_size, execution_time, false);
224
225 self.record_miss();
226 debug!(
227 "Cached tasks query: {} ({}ms, {} bytes)",
228 query_key, execution_time, result_size
229 );
230 Ok(data)
231 }
232
233 pub async fn cache_projects_query<F, Fut>(
242 &self,
243 query_key: &str,
244 params_hash: &str,
245 fetcher: F,
246 ) -> Result<Vec<Project>>
247 where
248 F: FnOnce() -> Fut,
249 Fut: std::future::Future<Output = Result<Vec<Project>>>,
250 {
251 if let Some(cached) = self.projects_cache.get(query_key).await {
253 if !cached.is_expired() && cached.params_hash == params_hash {
254 self.record_hit();
255 debug!("Query cache hit for projects: {}", query_key);
256 return Ok(cached.data);
257 }
258 }
259
260 let start_time = std::time::Instant::now();
262 let data = fetcher().await?;
263 #[allow(clippy::cast_possible_truncation)]
264 let execution_time = start_time.elapsed().as_millis() as u64;
265
266 let result_size = Self::calculate_result_size(&data);
268 if result_size > self.config.max_result_size {
269 warn!("Query result too large to cache: {} bytes", result_size);
270 self.record_miss();
271 return Ok(data);
272 }
273
274 let dependencies = Self::create_project_dependencies(&data);
276
277 let cached_result = CachedQueryResult {
279 data: data.clone(),
280 executed_at: Utc::now(),
281 expires_at: Utc::now()
282 + chrono::Duration::from_std(self.config.ttl).unwrap_or_default(),
283 execution_time_ms: execution_time,
284 params_hash: params_hash.to_string(),
285 dependencies,
286 result_size,
287 compressed: self.config.enable_compression,
288 };
289
290 self.projects_cache
292 .insert(query_key.to_string(), cached_result)
293 .await;
294
295 self.update_stats(result_size, execution_time, false);
297
298 self.record_miss();
299 debug!(
300 "Cached projects query: {} ({}ms, {} bytes)",
301 query_key, execution_time, result_size
302 );
303 Ok(data)
304 }
305
306 pub async fn cache_areas_query<F, Fut>(
315 &self,
316 query_key: &str,
317 params_hash: &str,
318 fetcher: F,
319 ) -> Result<Vec<Area>>
320 where
321 F: FnOnce() -> Fut,
322 Fut: std::future::Future<Output = Result<Vec<Area>>>,
323 {
324 if let Some(cached) = self.areas_cache.get(query_key).await {
326 if !cached.is_expired() && cached.params_hash == params_hash {
327 self.record_hit();
328 debug!("Query cache hit for areas: {}", query_key);
329 return Ok(cached.data);
330 }
331 }
332
333 let start_time = std::time::Instant::now();
335 let data = fetcher().await?;
336 #[allow(clippy::cast_possible_truncation)]
337 let execution_time = start_time.elapsed().as_millis() as u64;
338
339 let result_size = Self::calculate_result_size(&data);
341 if result_size > self.config.max_result_size {
342 warn!("Query result too large to cache: {} bytes", result_size);
343 self.record_miss();
344 return Ok(data);
345 }
346
347 let dependencies = Self::create_area_dependencies(&data);
349
350 let cached_result = CachedQueryResult {
352 data: data.clone(),
353 executed_at: Utc::now(),
354 expires_at: Utc::now()
355 + chrono::Duration::from_std(self.config.ttl).unwrap_or_default(),
356 execution_time_ms: execution_time,
357 params_hash: params_hash.to_string(),
358 dependencies,
359 result_size,
360 compressed: self.config.enable_compression,
361 };
362
363 self.areas_cache
365 .insert(query_key.to_string(), cached_result)
366 .await;
367
368 self.update_stats(result_size, execution_time, false);
370
371 self.record_miss();
372 debug!(
373 "Cached areas query: {} ({}ms, {} bytes)",
374 query_key, execution_time, result_size
375 );
376 Ok(data)
377 }
378
379 pub async fn cache_search_query<F, Fut>(
388 &self,
389 query_key: &str,
390 params_hash: &str,
391 fetcher: F,
392 ) -> Result<Vec<Task>>
393 where
394 F: FnOnce() -> Fut,
395 Fut: std::future::Future<Output = Result<Vec<Task>>>,
396 {
397 if let Some(cached) = self.search_cache.get(query_key).await {
399 if !cached.is_expired() && cached.params_hash == params_hash {
400 self.record_hit();
401 debug!("Query cache hit for search: {}", query_key);
402 return Ok(cached.data);
403 }
404 }
405
406 let start_time = std::time::Instant::now();
408 let data = fetcher().await?;
409 #[allow(clippy::cast_possible_truncation)]
410 let execution_time = start_time.elapsed().as_millis() as u64;
411
412 let result_size = Self::calculate_result_size(&data);
414 if result_size > self.config.max_result_size {
415 warn!("Query result too large to cache: {} bytes", result_size);
416 self.record_miss();
417 return Ok(data);
418 }
419
420 let dependencies = Self::create_task_dependencies(&data);
422
423 let cached_result = CachedQueryResult {
425 data: data.clone(),
426 executed_at: Utc::now(),
427 expires_at: Utc::now()
428 + chrono::Duration::from_std(self.config.ttl).unwrap_or_default(),
429 execution_time_ms: execution_time,
430 params_hash: params_hash.to_string(),
431 dependencies,
432 result_size,
433 compressed: self.config.enable_compression,
434 };
435
436 self.search_cache
438 .insert(query_key.to_string(), cached_result)
439 .await;
440
441 self.update_stats(result_size, execution_time, false);
443
444 self.record_miss();
445 debug!(
446 "Cached search query: {} ({}ms, {} bytes)",
447 query_key, execution_time, result_size
448 );
449 Ok(data)
450 }
451
452 pub fn invalidate_by_entity(&self, entity_type: &str, entity_id: Option<&Uuid>) {
454 self.tasks_cache.invalidate_all();
457 self.projects_cache.invalidate_all();
458 self.areas_cache.invalidate_all();
459 self.search_cache.invalidate_all();
460
461 debug!(
462 "Invalidated all query caches due to entity change: {} {:?}",
463 entity_type, entity_id
464 );
465 }
466
467 pub fn invalidate_by_operation(&self, operation: &str) {
469 match operation {
470 "task_created" | "task_updated" | "task_deleted" | "task_completed" => {
471 self.tasks_cache.invalidate_all();
472 self.search_cache.invalidate_all();
473 }
474 "project_created" | "project_updated" | "project_deleted" => {
475 self.projects_cache.invalidate_all();
476 self.tasks_cache.invalidate_all(); }
478 "area_created" | "area_updated" | "area_deleted" => {
479 self.areas_cache.invalidate_all();
480 self.projects_cache.invalidate_all(); self.tasks_cache.invalidate_all(); }
483 _ => {
484 self.invalidate_all();
486 }
487 }
488
489 debug!("Invalidated query caches due to operation: {}", operation);
490 }
491
492 pub fn invalidate_all(&self) {
494 self.tasks_cache.invalidate_all();
495 self.projects_cache.invalidate_all();
496 self.areas_cache.invalidate_all();
497 self.search_cache.invalidate_all();
498 }
499
500 #[must_use]
502 pub fn get_stats(&self) -> QueryCacheStats {
503 let mut stats = self.stats.read().clone();
504 stats.calculate_hit_rate();
505 stats
506 }
507
508 fn calculate_result_size<T>(data: &T) -> usize
510 where
511 T: Serialize,
512 {
513 serde_json::to_vec(data).map_or(0, |bytes| bytes.len())
515 }
516
517 fn create_task_dependencies(tasks: &[Task]) -> Vec<QueryDependency> {
519 let mut dependencies = Vec::new();
520
521 dependencies.push(QueryDependency {
523 table: "TMTask".to_string(),
524 entity_id: None,
525 invalidating_operations: vec![
526 "task_created".to_string(),
527 "task_updated".to_string(),
528 "task_deleted".to_string(),
529 "task_completed".to_string(),
530 ],
531 });
532
533 for task in tasks {
535 dependencies.push(QueryDependency {
536 table: "TMTask".to_string(),
537 entity_id: Some(task.uuid),
538 invalidating_operations: vec![
539 "task_updated".to_string(),
540 "task_deleted".to_string(),
541 "task_completed".to_string(),
542 ],
543 });
544
545 if let Some(project_uuid) = task.project_uuid {
547 dependencies.push(QueryDependency {
548 table: "TMProject".to_string(),
549 entity_id: Some(project_uuid),
550 invalidating_operations: vec![
551 "project_updated".to_string(),
552 "project_deleted".to_string(),
553 ],
554 });
555 }
556
557 if let Some(area_uuid) = task.area_uuid {
559 dependencies.push(QueryDependency {
560 table: "TMArea".to_string(),
561 entity_id: Some(area_uuid),
562 invalidating_operations: vec![
563 "area_updated".to_string(),
564 "area_deleted".to_string(),
565 ],
566 });
567 }
568 }
569
570 dependencies
571 }
572
573 fn create_project_dependencies(projects: &[Project]) -> Vec<QueryDependency> {
575 let mut dependencies = Vec::new();
576
577 dependencies.push(QueryDependency {
579 table: "TMProject".to_string(),
580 entity_id: None,
581 invalidating_operations: vec![
582 "project_created".to_string(),
583 "project_updated".to_string(),
584 "project_deleted".to_string(),
585 ],
586 });
587
588 for project in projects {
590 dependencies.push(QueryDependency {
591 table: "TMProject".to_string(),
592 entity_id: Some(project.uuid),
593 invalidating_operations: vec![
594 "project_updated".to_string(),
595 "project_deleted".to_string(),
596 ],
597 });
598
599 if let Some(area_uuid) = project.area_uuid {
601 dependencies.push(QueryDependency {
602 table: "TMArea".to_string(),
603 entity_id: Some(area_uuid),
604 invalidating_operations: vec![
605 "area_updated".to_string(),
606 "area_deleted".to_string(),
607 ],
608 });
609 }
610 }
611
612 dependencies
613 }
614
615 fn create_area_dependencies(areas: &[Area]) -> Vec<QueryDependency> {
617 let mut dependencies = Vec::new();
618
619 dependencies.push(QueryDependency {
621 table: "TMArea".to_string(),
622 entity_id: None,
623 invalidating_operations: vec![
624 "area_created".to_string(),
625 "area_updated".to_string(),
626 "area_deleted".to_string(),
627 ],
628 });
629
630 for area in areas {
632 dependencies.push(QueryDependency {
633 table: "TMArea".to_string(),
634 entity_id: Some(area.uuid),
635 invalidating_operations: vec![
636 "area_updated".to_string(),
637 "area_deleted".to_string(),
638 ],
639 });
640 }
641
642 dependencies
643 }
644
645 fn record_hit(&self) {
647 let mut stats = self.stats.write();
648 stats.hits += 1;
649 }
650
651 fn record_miss(&self) {
653 let mut stats = self.stats.write();
654 stats.misses += 1;
655 }
656
657 #[allow(clippy::cast_precision_loss)]
659 fn update_stats(&self, result_size: usize, execution_time_ms: u64, compressed: bool) {
660 let mut stats = self.stats.write();
661 stats.total_queries += 1;
662 stats.total_size_bytes += result_size as u64;
663
664 let total_queries = stats.total_queries as f64;
666 let current_avg = stats.average_execution_time_ms;
667 stats.average_execution_time_ms =
668 (current_avg * (total_queries - 1.0) + execution_time_ms as f64) / total_queries;
669
670 if compressed {
671 stats.compressed_queries += 1;
672 } else {
673 stats.uncompressed_queries += 1;
674 }
675 }
676}
677
678impl<T> CachedQueryResult<T> {
679 pub fn is_expired(&self) -> bool {
681 Utc::now() > self.expires_at
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use crate::models::TaskStatus;
689 use crate::test_utils::create_mock_tasks;
690
691 #[tokio::test]
692 async fn test_query_cache_basic_operations() {
693 let cache = QueryCache::new_default();
694
695 let tasks = create_mock_tasks();
697 let query_key = "test_tasks_query";
698 let params_hash = "test_params_hash";
699
700 let result = cache
701 .cache_tasks_query(query_key, params_hash, || async { Ok(tasks.clone()) })
702 .await
703 .unwrap();
704
705 assert_eq!(result.len(), tasks.len());
706
707 let cached_result = cache
709 .cache_tasks_query(query_key, params_hash, || async {
710 panic!("Should not execute fetcher on cache hit");
711 })
712 .await
713 .unwrap();
714
715 assert_eq!(cached_result.len(), tasks.len());
716
717 let different_params = "different_params_hash";
719 let _ = cache
720 .cache_tasks_query(query_key, different_params, || async {
721 Ok(create_mock_tasks())
722 })
723 .await
724 .unwrap();
725
726 let stats = cache.get_stats();
727 assert!(stats.hits >= 1);
728 assert!(stats.misses >= 1);
729 }
730
731 #[tokio::test]
732 async fn test_query_cache_invalidation() {
733 let cache = QueryCache::new_default();
734
735 let tasks = create_mock_tasks();
737 cache
738 .cache_tasks_query("test_query", "params", || async { Ok(tasks.clone()) })
739 .await
740 .unwrap();
741
742 cache.invalidate_by_operation("task_updated");
744
745 let _ = cache
747 .cache_tasks_query("test_query", "params", || async { Ok(create_mock_tasks()) })
748 .await
749 .unwrap();
750
751 let stats = cache.get_stats();
752 assert!(stats.misses >= 2);
753 }
754
755 #[tokio::test]
756 async fn test_query_cache_dependencies() {
757 let _cache = QueryCache::new_default();
758
759 let tasks = create_mock_tasks();
760 let dependencies = QueryCache::create_task_dependencies(&tasks);
761
762 assert!(!dependencies.is_empty());
763 assert!(dependencies.iter().any(|dep| dep.table == "TMTask"));
764 }
765
766 #[tokio::test]
767 async fn test_query_cache_projects_query() {
768 let cache = QueryCache::new_default();
769
770 let projects = vec![Project {
771 uuid: Uuid::new_v4(),
772 title: "Project 1".to_string(),
773 area_uuid: Some(Uuid::new_v4()),
774 created: Utc::now(),
775 modified: Utc::now(),
776 status: TaskStatus::Incomplete,
777 notes: Some("Notes".to_string()),
778 deadline: None,
779 start_date: None,
780 tags: vec![],
781 tasks: vec![],
782 }];
783
784 let query_key = "test_projects_query";
785 let params_hash = "test_params";
786
787 let result = cache
789 .cache_projects_query(query_key, params_hash, || async { Ok(projects.clone()) })
790 .await
791 .unwrap();
792
793 assert_eq!(result.len(), projects.len());
794
795 let cached_result = cache
797 .cache_projects_query(query_key, params_hash, || async {
798 panic!("Should not execute fetcher on cache hit");
799 })
800 .await
801 .unwrap();
802
803 assert_eq!(cached_result.len(), projects.len());
804 }
805
806 #[tokio::test]
807 async fn test_query_cache_config_default() {
808 let config = QueryCacheConfig::default();
809 assert_eq!(config.max_queries, 1000);
810 assert_eq!(config.ttl, Duration::from_secs(1800));
811 assert_eq!(config.tti, Duration::from_secs(300));
812 assert!(config.enable_compression);
813 assert_eq!(config.max_result_size, 1024 * 1024);
814 }
815
816 #[tokio::test]
817 async fn test_cached_query_result_creation() {
818 let tasks = create_mock_tasks();
819 let now = Utc::now();
820 let expires_at = now + chrono::Duration::seconds(1800);
821
822 let dependency = QueryDependency {
823 table: "TMTask".to_string(),
824 entity_id: None,
825 invalidating_operations: vec![
826 "INSERT".to_string(),
827 "UPDATE".to_string(),
828 "DELETE".to_string(),
829 ],
830 };
831
832 let result = CachedQueryResult {
833 data: tasks.clone(),
834 executed_at: now,
835 expires_at,
836 execution_time_ms: 100,
837 params_hash: "test_hash".to_string(),
838 result_size: 1024,
839 dependencies: vec![dependency.clone()],
840 compressed: false,
841 };
842
843 assert_eq!(result.data.len(), tasks.len());
844 assert_eq!(result.execution_time_ms, 100);
845 assert_eq!(result.result_size, 1024);
846 assert_eq!(result.params_hash, "test_hash");
847 assert_eq!(result.dependencies, vec![dependency]);
848 assert!(!result.compressed);
849 }
850
851 #[tokio::test]
852 async fn test_query_cache_areas_query() {
853 let cache = QueryCache::new_default();
854
855 let areas = vec![Area {
856 uuid: Uuid::new_v4(),
857 title: "Area 1".to_string(),
858 created: Utc::now(),
859 modified: Utc::now(),
860 notes: Some("Notes".to_string()),
861 tags: vec![],
862 projects: vec![],
863 }];
864
865 let query_key = "test_areas_query";
866 let params_hash = "test_params";
867
868 let result = cache
870 .cache_areas_query(query_key, params_hash, || async { Ok(areas.clone()) })
871 .await
872 .unwrap();
873
874 assert_eq!(result.len(), areas.len());
875
876 let cached_result = cache
878 .cache_areas_query(query_key, params_hash, || async {
879 panic!("Should not execute fetcher on cache hit");
880 })
881 .await
882 .unwrap();
883
884 assert_eq!(cached_result.len(), areas.len());
885 }
886
887 #[tokio::test]
888 async fn test_query_cache_expiration() {
889 let config = QueryCacheConfig {
890 max_queries: 100,
891 ttl: Duration::from_millis(10), tti: Duration::from_millis(5),
893 enable_compression: false,
894 max_result_size: 1024,
895 };
896 let cache = QueryCache::new(config);
897
898 let tasks = create_mock_tasks();
899 let query_key = "test_expiration";
900 let params_hash = "test_params";
901
902 let _result = cache
904 .cache_tasks_query(query_key, params_hash, || async { Ok(tasks.clone()) })
905 .await
906 .unwrap();
907
908 tokio::time::sleep(Duration::from_millis(20)).await;
910
911 let mut fetcher_called = false;
913 let _expired_result = cache
914 .cache_tasks_query(query_key, params_hash, || async {
915 fetcher_called = true;
916 Ok(tasks.clone())
917 })
918 .await
919 .unwrap();
920
921 assert!(fetcher_called);
922 }
923
924 #[tokio::test]
925 async fn test_query_cache_size_limit() {
926 let config = QueryCacheConfig {
927 max_queries: 2, ttl: Duration::from_secs(300),
929 tti: Duration::from_secs(60),
930 enable_compression: false,
931 max_result_size: 1024,
932 };
933 let cache = QueryCache::new(config);
934
935 let tasks = create_mock_tasks();
936
937 let _result1 = cache
939 .cache_tasks_query("key1", "params1", || async { Ok(tasks.clone()) })
940 .await
941 .unwrap();
942
943 let _result2 = cache
944 .cache_tasks_query("key2", "params2", || async { Ok(tasks.clone()) })
945 .await
946 .unwrap();
947
948 let _result3 = cache
950 .cache_tasks_query("key3", "params3", || async { Ok(tasks.clone()) })
951 .await
952 .unwrap();
953
954 let stats = cache.get_stats();
957 assert!(stats.total_queries <= 10); }
960
961 #[tokio::test]
962 async fn test_query_cache_concurrent_access() {
963 let cache = Arc::new(QueryCache::new_default());
964 let tasks = create_mock_tasks();
965
966 let mut handles = vec![];
968
969 for i in 0..10 {
970 let cache_clone = cache.clone();
971 let tasks_clone = tasks.clone();
972 let handle = tokio::spawn(async move {
973 let key = format!("concurrent_key_{i}");
974 let params = format!("params_{i}");
975 let result = cache_clone
976 .cache_tasks_query(&key, ¶ms, || async { Ok(tasks_clone.clone()) })
977 .await
978 .unwrap();
979 assert!(!result.is_empty());
980 });
981 handles.push(handle);
982 }
983
984 for handle in handles {
986 handle.await.unwrap();
987 }
988 }
989
990 #[tokio::test]
991 async fn test_query_cache_error_handling() {
992 let cache = QueryCache::new_default();
993
994 let query_key = "error_test";
995 let params_hash = "test_params";
996
997 let result = cache
999 .cache_tasks_query(query_key, params_hash, || async {
1000 Err(anyhow::anyhow!("Test error"))
1001 })
1002 .await;
1003
1004 assert!(result.is_err());
1005 }
1006
1007 #[tokio::test]
1008 async fn test_query_cache_compression() {
1009 let config = QueryCacheConfig {
1010 max_queries: 100,
1011 ttl: Duration::from_secs(300),
1012 tti: Duration::from_secs(60),
1013 enable_compression: true,
1014 max_result_size: 1024 * 1024,
1015 };
1016 let cache = QueryCache::new(config);
1017
1018 let tasks = create_mock_tasks();
1019 let query_key = "compression_test";
1020 let params_hash = "test_params";
1021
1022 let result = cache
1024 .cache_tasks_query(query_key, params_hash, || async { Ok(tasks.clone()) })
1025 .await
1026 .unwrap();
1027
1028 assert_eq!(result.len(), tasks.len());
1029
1030 let cached_result = cache
1032 .cache_tasks_query(query_key, params_hash, || async {
1033 panic!("Should not execute fetcher on cache hit");
1034 })
1035 .await
1036 .unwrap();
1037
1038 assert_eq!(cached_result.len(), tasks.len());
1039 }
1040}