1use crate::models::{Area, Project, Task, ThingsId};
4use anyhow::Result;
5use chrono::{DateTime, Utc};
6use moka::future::Cache;
7use parking_lot::RwLock;
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10use std::time::Duration;
11use tracing::{debug, warn};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct QueryCacheConfig {
16 pub max_queries: u64,
18 pub ttl: Duration,
20 pub tti: Duration,
22 pub enable_compression: bool,
24 pub max_result_size: usize,
26}
27
28impl Default for QueryCacheConfig {
29 fn default() -> Self {
30 Self {
31 max_queries: 1000,
32 ttl: Duration::from_secs(1800), tti: Duration::from_secs(300), enable_compression: true,
35 max_result_size: 1024 * 1024, }
37 }
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct CachedQueryResult<T> {
43 pub data: T,
45 pub executed_at: DateTime<Utc>,
47 pub expires_at: DateTime<Utc>,
49 pub execution_time_ms: u64,
51 pub params_hash: String,
53 pub dependencies: Vec<QueryDependency>,
55 pub result_size: usize,
57 pub compressed: bool,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
63pub struct QueryDependency {
64 pub table: String,
66 pub entity_id: Option<ThingsId>,
68 pub invalidating_operations: Vec<String>,
70}
71
72#[derive(Debug, Clone, Default, Serialize, Deserialize)]
74pub struct QueryCacheStats {
75 pub total_queries: u64,
76 pub hits: u64,
77 pub misses: u64,
78 pub hit_rate: f64,
79 pub total_size_bytes: u64,
80 pub average_execution_time_ms: f64,
81 pub compressed_queries: u64,
82 pub uncompressed_queries: u64,
83}
84
85impl QueryCacheStats {
86 pub fn calculate_hit_rate(&mut self) {
87 let total = self.hits + self.misses;
88 self.hit_rate = if total > 0 {
89 #[allow(clippy::cast_precision_loss)]
90 {
91 self.hits as f64 / total as f64
92 }
93 } else {
94 0.0
95 };
96 }
97}
98
99pub struct QueryCache {
101 tasks_cache: Cache<String, CachedQueryResult<Vec<Task>>>,
103 projects_cache: Cache<String, CachedQueryResult<Vec<Project>>>,
105 areas_cache: Cache<String, CachedQueryResult<Vec<Area>>>,
107 search_cache: Cache<String, CachedQueryResult<Vec<Task>>>,
109 stats: Arc<RwLock<QueryCacheStats>>,
111 config: QueryCacheConfig,
113}
114
115impl QueryCache {
116 #[must_use]
118 pub fn new(config: QueryCacheConfig) -> Self {
119 let tasks_cache = Cache::builder()
120 .max_capacity(config.max_queries)
121 .time_to_live(config.ttl)
122 .time_to_idle(config.tti)
123 .build();
124
125 let projects_cache = Cache::builder()
126 .max_capacity(config.max_queries)
127 .time_to_live(config.ttl)
128 .time_to_idle(config.tti)
129 .build();
130
131 let areas_cache = Cache::builder()
132 .max_capacity(config.max_queries)
133 .time_to_live(config.ttl)
134 .time_to_idle(config.tti)
135 .build();
136
137 let search_cache = Cache::builder()
138 .max_capacity(config.max_queries)
139 .time_to_live(config.ttl)
140 .time_to_idle(config.tti)
141 .build();
142
143 Self {
144 tasks_cache,
145 projects_cache,
146 areas_cache,
147 search_cache,
148 stats: Arc::new(RwLock::new(QueryCacheStats::default())),
149 config,
150 }
151 }
152
153 #[must_use]
155 pub fn new_default() -> Self {
156 Self::new(QueryCacheConfig::default())
157 }
158
159 pub async fn cache_tasks_query<F, Fut>(
168 &self,
169 query_key: &str,
170 params_hash: &str,
171 fetcher: F,
172 ) -> Result<Vec<Task>>
173 where
174 F: FnOnce() -> Fut,
175 Fut: std::future::Future<Output = Result<Vec<Task>>>,
176 {
177 if let Some(cached) = self.tasks_cache.get(query_key).await {
179 if !cached.is_expired() && cached.params_hash == params_hash {
180 self.record_hit();
181 debug!("Query cache hit for tasks: {}", query_key);
182 return Ok(cached.data);
183 }
184 }
185
186 let start_time = std::time::Instant::now();
188 let data = fetcher().await?;
189 #[allow(clippy::cast_possible_truncation)]
190 let execution_time = start_time.elapsed().as_millis() as u64;
191
192 let result_size = Self::calculate_result_size(&data);
194 if result_size > self.config.max_result_size {
195 warn!("Query result too large to cache: {} bytes", result_size);
196 self.record_miss();
197 return Ok(data);
198 }
199
200 let dependencies = Self::create_task_dependencies(&data);
202
203 let cached_result = CachedQueryResult {
205 data: data.clone(),
206 executed_at: Utc::now(),
207 expires_at: Utc::now()
208 + chrono::Duration::from_std(self.config.ttl).unwrap_or_default(),
209 execution_time_ms: execution_time,
210 params_hash: params_hash.to_string(),
211 dependencies,
212 result_size,
213 compressed: self.config.enable_compression,
214 };
215
216 self.tasks_cache
218 .insert(query_key.to_string(), cached_result)
219 .await;
220
221 self.update_stats(result_size, execution_time, false);
223
224 self.record_miss();
225 debug!(
226 "Cached tasks query: {} ({}ms, {} bytes)",
227 query_key, execution_time, result_size
228 );
229 Ok(data)
230 }
231
232 pub async fn cache_projects_query<F, Fut>(
241 &self,
242 query_key: &str,
243 params_hash: &str,
244 fetcher: F,
245 ) -> Result<Vec<Project>>
246 where
247 F: FnOnce() -> Fut,
248 Fut: std::future::Future<Output = Result<Vec<Project>>>,
249 {
250 if let Some(cached) = self.projects_cache.get(query_key).await {
252 if !cached.is_expired() && cached.params_hash == params_hash {
253 self.record_hit();
254 debug!("Query cache hit for projects: {}", query_key);
255 return Ok(cached.data);
256 }
257 }
258
259 let start_time = std::time::Instant::now();
261 let data = fetcher().await?;
262 #[allow(clippy::cast_possible_truncation)]
263 let execution_time = start_time.elapsed().as_millis() as u64;
264
265 let result_size = Self::calculate_result_size(&data);
267 if result_size > self.config.max_result_size {
268 warn!("Query result too large to cache: {} bytes", result_size);
269 self.record_miss();
270 return Ok(data);
271 }
272
273 let dependencies = Self::create_project_dependencies(&data);
275
276 let cached_result = CachedQueryResult {
278 data: data.clone(),
279 executed_at: Utc::now(),
280 expires_at: Utc::now()
281 + chrono::Duration::from_std(self.config.ttl).unwrap_or_default(),
282 execution_time_ms: execution_time,
283 params_hash: params_hash.to_string(),
284 dependencies,
285 result_size,
286 compressed: self.config.enable_compression,
287 };
288
289 self.projects_cache
291 .insert(query_key.to_string(), cached_result)
292 .await;
293
294 self.update_stats(result_size, execution_time, false);
296
297 self.record_miss();
298 debug!(
299 "Cached projects query: {} ({}ms, {} bytes)",
300 query_key, execution_time, result_size
301 );
302 Ok(data)
303 }
304
305 pub async fn cache_areas_query<F, Fut>(
314 &self,
315 query_key: &str,
316 params_hash: &str,
317 fetcher: F,
318 ) -> Result<Vec<Area>>
319 where
320 F: FnOnce() -> Fut,
321 Fut: std::future::Future<Output = Result<Vec<Area>>>,
322 {
323 if let Some(cached) = self.areas_cache.get(query_key).await {
325 if !cached.is_expired() && cached.params_hash == params_hash {
326 self.record_hit();
327 debug!("Query cache hit for areas: {}", query_key);
328 return Ok(cached.data);
329 }
330 }
331
332 let start_time = std::time::Instant::now();
334 let data = fetcher().await?;
335 #[allow(clippy::cast_possible_truncation)]
336 let execution_time = start_time.elapsed().as_millis() as u64;
337
338 let result_size = Self::calculate_result_size(&data);
340 if result_size > self.config.max_result_size {
341 warn!("Query result too large to cache: {} bytes", result_size);
342 self.record_miss();
343 return Ok(data);
344 }
345
346 let dependencies = Self::create_area_dependencies(&data);
348
349 let cached_result = CachedQueryResult {
351 data: data.clone(),
352 executed_at: Utc::now(),
353 expires_at: Utc::now()
354 + chrono::Duration::from_std(self.config.ttl).unwrap_or_default(),
355 execution_time_ms: execution_time,
356 params_hash: params_hash.to_string(),
357 dependencies,
358 result_size,
359 compressed: self.config.enable_compression,
360 };
361
362 self.areas_cache
364 .insert(query_key.to_string(), cached_result)
365 .await;
366
367 self.update_stats(result_size, execution_time, false);
369
370 self.record_miss();
371 debug!(
372 "Cached areas query: {} ({}ms, {} bytes)",
373 query_key, execution_time, result_size
374 );
375 Ok(data)
376 }
377
378 pub async fn cache_search_query<F, Fut>(
387 &self,
388 query_key: &str,
389 params_hash: &str,
390 fetcher: F,
391 ) -> Result<Vec<Task>>
392 where
393 F: FnOnce() -> Fut,
394 Fut: std::future::Future<Output = Result<Vec<Task>>>,
395 {
396 if let Some(cached) = self.search_cache.get(query_key).await {
398 if !cached.is_expired() && cached.params_hash == params_hash {
399 self.record_hit();
400 debug!("Query cache hit for search: {}", query_key);
401 return Ok(cached.data);
402 }
403 }
404
405 let start_time = std::time::Instant::now();
407 let data = fetcher().await?;
408 #[allow(clippy::cast_possible_truncation)]
409 let execution_time = start_time.elapsed().as_millis() as u64;
410
411 let result_size = Self::calculate_result_size(&data);
413 if result_size > self.config.max_result_size {
414 warn!("Query result too large to cache: {} bytes", result_size);
415 self.record_miss();
416 return Ok(data);
417 }
418
419 let dependencies = Self::create_task_dependencies(&data);
421
422 let cached_result = CachedQueryResult {
424 data: data.clone(),
425 executed_at: Utc::now(),
426 expires_at: Utc::now()
427 + chrono::Duration::from_std(self.config.ttl).unwrap_or_default(),
428 execution_time_ms: execution_time,
429 params_hash: params_hash.to_string(),
430 dependencies,
431 result_size,
432 compressed: self.config.enable_compression,
433 };
434
435 self.search_cache
437 .insert(query_key.to_string(), cached_result)
438 .await;
439
440 self.update_stats(result_size, execution_time, false);
442
443 self.record_miss();
444 debug!(
445 "Cached search query: {} ({}ms, {} bytes)",
446 query_key, execution_time, result_size
447 );
448 Ok(data)
449 }
450
451 pub fn invalidate_by_entity(&self, entity_type: &str, entity_id: Option<&ThingsId>) {
453 self.tasks_cache.invalidate_all();
456 self.projects_cache.invalidate_all();
457 self.areas_cache.invalidate_all();
458 self.search_cache.invalidate_all();
459
460 debug!(
461 "Invalidated all query caches due to entity change: {} {:?}",
462 entity_type, entity_id
463 );
464 }
465
466 pub fn invalidate_by_operation(&self, operation: &str) {
468 match operation {
469 "task_created" | "task_updated" | "task_deleted" | "task_completed" => {
470 self.tasks_cache.invalidate_all();
471 self.search_cache.invalidate_all();
472 }
473 "project_created" | "project_updated" | "project_deleted" => {
474 self.projects_cache.invalidate_all();
475 self.tasks_cache.invalidate_all(); }
477 "area_created" | "area_updated" | "area_deleted" => {
478 self.areas_cache.invalidate_all();
479 self.projects_cache.invalidate_all(); self.tasks_cache.invalidate_all(); }
482 _ => {
483 self.invalidate_all();
485 }
486 }
487
488 debug!("Invalidated query caches due to operation: {}", operation);
489 }
490
491 pub fn invalidate_all(&self) {
493 self.tasks_cache.invalidate_all();
494 self.projects_cache.invalidate_all();
495 self.areas_cache.invalidate_all();
496 self.search_cache.invalidate_all();
497 }
498
499 #[must_use]
501 pub fn get_stats(&self) -> QueryCacheStats {
502 let mut stats = self.stats.read().clone();
503 stats.calculate_hit_rate();
504 stats
505 }
506
507 fn calculate_result_size<T>(data: &T) -> usize
509 where
510 T: Serialize,
511 {
512 serde_json::to_vec(data).map_or(0, |bytes| bytes.len())
514 }
515
516 fn create_task_dependencies(tasks: &[Task]) -> Vec<QueryDependency> {
518 let mut dependencies = Vec::new();
519
520 dependencies.push(QueryDependency {
522 table: "TMTask".to_string(),
523 entity_id: None,
524 invalidating_operations: vec![
525 "task_created".to_string(),
526 "task_updated".to_string(),
527 "task_deleted".to_string(),
528 "task_completed".to_string(),
529 ],
530 });
531
532 for task in tasks {
534 dependencies.push(QueryDependency {
535 table: "TMTask".to_string(),
536 entity_id: Some(task.uuid.clone()),
537 invalidating_operations: vec![
538 "task_updated".to_string(),
539 "task_deleted".to_string(),
540 "task_completed".to_string(),
541 ],
542 });
543
544 if let Some(project_uuid) = &task.project_uuid {
546 dependencies.push(QueryDependency {
547 table: "TMProject".to_string(),
548 entity_id: Some(project_uuid.clone()),
549 invalidating_operations: vec![
550 "project_updated".to_string(),
551 "project_deleted".to_string(),
552 ],
553 });
554 }
555
556 if let Some(area_uuid) = &task.area_uuid {
558 dependencies.push(QueryDependency {
559 table: "TMArea".to_string(),
560 entity_id: Some(area_uuid.clone()),
561 invalidating_operations: vec![
562 "area_updated".to_string(),
563 "area_deleted".to_string(),
564 ],
565 });
566 }
567 }
568
569 dependencies
570 }
571
572 fn create_project_dependencies(projects: &[Project]) -> Vec<QueryDependency> {
574 let mut dependencies = Vec::new();
575
576 dependencies.push(QueryDependency {
578 table: "TMProject".to_string(),
579 entity_id: None,
580 invalidating_operations: vec![
581 "project_created".to_string(),
582 "project_updated".to_string(),
583 "project_deleted".to_string(),
584 ],
585 });
586
587 for project in projects {
589 dependencies.push(QueryDependency {
590 table: "TMProject".to_string(),
591 entity_id: Some(project.uuid.clone()),
592 invalidating_operations: vec![
593 "project_updated".to_string(),
594 "project_deleted".to_string(),
595 ],
596 });
597
598 if let Some(area_uuid) = &project.area_uuid {
600 dependencies.push(QueryDependency {
601 table: "TMArea".to_string(),
602 entity_id: Some(area_uuid.clone()),
603 invalidating_operations: vec![
604 "area_updated".to_string(),
605 "area_deleted".to_string(),
606 ],
607 });
608 }
609 }
610
611 dependencies
612 }
613
614 fn create_area_dependencies(areas: &[Area]) -> Vec<QueryDependency> {
616 let mut dependencies = Vec::new();
617
618 dependencies.push(QueryDependency {
620 table: "TMArea".to_string(),
621 entity_id: None,
622 invalidating_operations: vec![
623 "area_created".to_string(),
624 "area_updated".to_string(),
625 "area_deleted".to_string(),
626 ],
627 });
628
629 for area in areas {
631 dependencies.push(QueryDependency {
632 table: "TMArea".to_string(),
633 entity_id: Some(area.uuid.clone()),
634 invalidating_operations: vec![
635 "area_updated".to_string(),
636 "area_deleted".to_string(),
637 ],
638 });
639 }
640
641 dependencies
642 }
643
644 fn record_hit(&self) {
646 let mut stats = self.stats.write();
647 stats.hits += 1;
648 }
649
650 fn record_miss(&self) {
652 let mut stats = self.stats.write();
653 stats.misses += 1;
654 }
655
656 #[allow(clippy::cast_precision_loss)]
658 fn update_stats(&self, result_size: usize, execution_time_ms: u64, compressed: bool) {
659 let mut stats = self.stats.write();
660 stats.total_queries += 1;
661 stats.total_size_bytes += result_size as u64;
662
663 let total_queries = stats.total_queries as f64;
665 let current_avg = stats.average_execution_time_ms;
666 stats.average_execution_time_ms =
667 (current_avg * (total_queries - 1.0) + execution_time_ms as f64) / total_queries;
668
669 if compressed {
670 stats.compressed_queries += 1;
671 } else {
672 stats.uncompressed_queries += 1;
673 }
674 }
675}
676
677impl<T> CachedQueryResult<T> {
678 pub fn is_expired(&self) -> bool {
680 Utc::now() > self.expires_at
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687 use crate::models::TaskStatus;
688 use crate::test_utils::create_mock_tasks;
689
690 #[tokio::test]
691 async fn test_query_cache_basic_operations() {
692 let cache = QueryCache::new_default();
693
694 let tasks = create_mock_tasks();
696 let query_key = "test_tasks_query";
697 let params_hash = "test_params_hash";
698
699 let result = cache
700 .cache_tasks_query(query_key, params_hash, || async { Ok(tasks.clone()) })
701 .await
702 .unwrap();
703
704 assert_eq!(result.len(), tasks.len());
705
706 let cached_result = cache
708 .cache_tasks_query(query_key, params_hash, || async {
709 panic!("Should not execute fetcher on cache hit");
710 })
711 .await
712 .unwrap();
713
714 assert_eq!(cached_result.len(), tasks.len());
715
716 let different_params = "different_params_hash";
718 let _ = cache
719 .cache_tasks_query(query_key, different_params, || async {
720 Ok(create_mock_tasks())
721 })
722 .await
723 .unwrap();
724
725 let stats = cache.get_stats();
726 assert!(stats.hits >= 1);
727 assert!(stats.misses >= 1);
728 }
729
730 #[tokio::test]
731 async fn test_query_cache_invalidation() {
732 let cache = QueryCache::new_default();
733
734 let tasks = create_mock_tasks();
736 cache
737 .cache_tasks_query("test_query", "params", || async { Ok(tasks.clone()) })
738 .await
739 .unwrap();
740
741 cache.invalidate_by_operation("task_updated");
743
744 let _ = cache
746 .cache_tasks_query("test_query", "params", || async { Ok(create_mock_tasks()) })
747 .await
748 .unwrap();
749
750 let stats = cache.get_stats();
751 assert!(stats.misses >= 2);
752 }
753
754 #[tokio::test]
755 async fn test_query_cache_dependencies() {
756 let _cache = QueryCache::new_default();
757
758 let tasks = create_mock_tasks();
759 let dependencies = QueryCache::create_task_dependencies(&tasks);
760
761 assert!(!dependencies.is_empty());
762 assert!(dependencies.iter().any(|dep| dep.table == "TMTask"));
763 }
764
765 #[tokio::test]
766 async fn test_query_cache_projects_query() {
767 let cache = QueryCache::new_default();
768
769 let projects = vec![Project {
770 uuid: ThingsId::new_v4(),
771 title: "Project 1".to_string(),
772 area_uuid: Some(ThingsId::new_v4()),
773 created: Utc::now(),
774 modified: Utc::now(),
775 status: TaskStatus::Incomplete,
776 notes: Some("Notes".to_string()),
777 deadline: None,
778 start_date: None,
779 tags: vec![],
780 tasks: vec![],
781 }];
782
783 let query_key = "test_projects_query";
784 let params_hash = "test_params";
785
786 let result = cache
788 .cache_projects_query(query_key, params_hash, || async { Ok(projects.clone()) })
789 .await
790 .unwrap();
791
792 assert_eq!(result.len(), projects.len());
793
794 let cached_result = cache
796 .cache_projects_query(query_key, params_hash, || async {
797 panic!("Should not execute fetcher on cache hit");
798 })
799 .await
800 .unwrap();
801
802 assert_eq!(cached_result.len(), projects.len());
803 }
804
805 #[tokio::test]
806 async fn test_query_cache_config_default() {
807 let config = QueryCacheConfig::default();
808 assert_eq!(config.max_queries, 1000);
809 assert_eq!(config.ttl, Duration::from_secs(1800));
810 assert_eq!(config.tti, Duration::from_secs(300));
811 assert!(config.enable_compression);
812 assert_eq!(config.max_result_size, 1024 * 1024);
813 }
814
815 #[tokio::test]
816 async fn test_cached_query_result_creation() {
817 let tasks = create_mock_tasks();
818 let now = Utc::now();
819 let expires_at = now + chrono::Duration::seconds(1800);
820
821 let dependency = QueryDependency {
822 table: "TMTask".to_string(),
823 entity_id: None,
824 invalidating_operations: vec![
825 "INSERT".to_string(),
826 "UPDATE".to_string(),
827 "DELETE".to_string(),
828 ],
829 };
830
831 let result = CachedQueryResult {
832 data: tasks.clone(),
833 executed_at: now,
834 expires_at,
835 execution_time_ms: 100,
836 params_hash: "test_hash".to_string(),
837 result_size: 1024,
838 dependencies: vec![dependency.clone()],
839 compressed: false,
840 };
841
842 assert_eq!(result.data.len(), tasks.len());
843 assert_eq!(result.execution_time_ms, 100);
844 assert_eq!(result.result_size, 1024);
845 assert_eq!(result.params_hash, "test_hash");
846 assert_eq!(result.dependencies, vec![dependency]);
847 assert!(!result.compressed);
848 }
849
850 #[tokio::test]
851 async fn test_query_cache_areas_query() {
852 let cache = QueryCache::new_default();
853
854 let areas = vec![Area {
855 uuid: ThingsId::new_v4(),
856 title: "Area 1".to_string(),
857 created: Utc::now(),
858 modified: Utc::now(),
859 notes: Some("Notes".to_string()),
860 tags: vec![],
861 projects: vec![],
862 }];
863
864 let query_key = "test_areas_query";
865 let params_hash = "test_params";
866
867 let result = cache
869 .cache_areas_query(query_key, params_hash, || async { Ok(areas.clone()) })
870 .await
871 .unwrap();
872
873 assert_eq!(result.len(), areas.len());
874
875 let cached_result = cache
877 .cache_areas_query(query_key, params_hash, || async {
878 panic!("Should not execute fetcher on cache hit");
879 })
880 .await
881 .unwrap();
882
883 assert_eq!(cached_result.len(), areas.len());
884 }
885
886 #[tokio::test]
887 async fn test_query_cache_expiration() {
888 let config = QueryCacheConfig {
889 max_queries: 100,
890 ttl: Duration::from_millis(10), tti: Duration::from_millis(5),
892 enable_compression: false,
893 max_result_size: 1024,
894 };
895 let cache = QueryCache::new(config);
896
897 let tasks = create_mock_tasks();
898 let query_key = "test_expiration";
899 let params_hash = "test_params";
900
901 let _result = cache
903 .cache_tasks_query(query_key, params_hash, || async { Ok(tasks.clone()) })
904 .await
905 .unwrap();
906
907 tokio::time::sleep(Duration::from_millis(20)).await;
909
910 let mut fetcher_called = false;
912 let _expired_result = cache
913 .cache_tasks_query(query_key, params_hash, || async {
914 fetcher_called = true;
915 Ok(tasks.clone())
916 })
917 .await
918 .unwrap();
919
920 assert!(fetcher_called);
921 }
922
923 #[tokio::test]
924 async fn test_query_cache_size_limit() {
925 let config = QueryCacheConfig {
926 max_queries: 2, ttl: Duration::from_secs(300),
928 tti: Duration::from_secs(60),
929 enable_compression: false,
930 max_result_size: 1024,
931 };
932 let cache = QueryCache::new(config);
933
934 let tasks = create_mock_tasks();
935
936 let _result1 = cache
938 .cache_tasks_query("key1", "params1", || async { Ok(tasks.clone()) })
939 .await
940 .unwrap();
941
942 let _result2 = cache
943 .cache_tasks_query("key2", "params2", || async { Ok(tasks.clone()) })
944 .await
945 .unwrap();
946
947 let _result3 = cache
949 .cache_tasks_query("key3", "params3", || async { Ok(tasks.clone()) })
950 .await
951 .unwrap();
952
953 let stats = cache.get_stats();
956 assert!(stats.total_queries <= 10); }
959
960 #[tokio::test]
961 async fn test_query_cache_concurrent_access() {
962 let cache = Arc::new(QueryCache::new_default());
963 let tasks = create_mock_tasks();
964
965 let mut handles = vec![];
967
968 for i in 0..10 {
969 let cache_clone = cache.clone();
970 let tasks_clone = tasks.clone();
971 let handle = tokio::spawn(async move {
972 let key = format!("concurrent_key_{i}");
973 let params = format!("params_{i}");
974 let result = cache_clone
975 .cache_tasks_query(&key, ¶ms, || async { Ok(tasks_clone.clone()) })
976 .await
977 .unwrap();
978 assert!(!result.is_empty());
979 });
980 handles.push(handle);
981 }
982
983 for handle in handles {
985 handle.await.unwrap();
986 }
987 }
988
989 #[tokio::test]
990 async fn test_query_cache_error_handling() {
991 let cache = QueryCache::new_default();
992
993 let query_key = "error_test";
994 let params_hash = "test_params";
995
996 let result = cache
998 .cache_tasks_query(query_key, params_hash, || async {
999 Err(anyhow::anyhow!("Test error"))
1000 })
1001 .await;
1002
1003 assert!(result.is_err());
1004 }
1005
1006 #[tokio::test]
1007 async fn test_query_cache_compression() {
1008 let config = QueryCacheConfig {
1009 max_queries: 100,
1010 ttl: Duration::from_secs(300),
1011 tti: Duration::from_secs(60),
1012 enable_compression: true,
1013 max_result_size: 1024 * 1024,
1014 };
1015 let cache = QueryCache::new(config);
1016
1017 let tasks = create_mock_tasks();
1018 let query_key = "compression_test";
1019 let params_hash = "test_params";
1020
1021 let result = cache
1023 .cache_tasks_query(query_key, params_hash, || async { Ok(tasks.clone()) })
1024 .await
1025 .unwrap();
1026
1027 assert_eq!(result.len(), tasks.len());
1028
1029 let cached_result = cache
1031 .cache_tasks_query(query_key, params_hash, || async {
1032 panic!("Should not execute fetcher on cache hit");
1033 })
1034 .await
1035 .unwrap();
1036
1037 assert_eq!(cached_result.len(), tasks.len());
1038 }
1039}