1use crate::models::{Area, Project, Task, ThingsId};
2use anyhow::Result;
3use moka::future::Cache;
4
5use super::config::{CacheConfig, CacheDependency};
6use super::stats::{CachePreloader, CacheStats, CachedData};
7use super::ThingsCache;
8
9impl ThingsCache {
10 #[must_use]
12 pub fn new(config: &CacheConfig) -> Self {
13 let tasks = Cache::builder()
14 .max_capacity(config.max_capacity)
15 .time_to_live(config.ttl)
16 .time_to_idle(config.tti)
17 .build();
18
19 let projects = Cache::builder()
20 .max_capacity(config.max_capacity)
21 .time_to_live(config.ttl)
22 .time_to_idle(config.tti)
23 .build();
24
25 let areas = Cache::builder()
26 .max_capacity(config.max_capacity)
27 .time_to_live(config.ttl)
28 .time_to_idle(config.tti)
29 .build();
30
31 let search_results = Cache::builder()
32 .max_capacity(config.max_capacity)
33 .time_to_live(config.ttl)
34 .time_to_idle(config.tti)
35 .build();
36
37 let mut cache = Self {
38 tasks,
39 projects,
40 areas,
41 search_results,
42 stats: std::sync::Arc::new(parking_lot::RwLock::new(CacheStats::default())),
43 config: config.clone(),
44 warming_entries: std::sync::Arc::new(parking_lot::RwLock::new(
45 std::collections::HashMap::new(),
46 )),
47 preloader: std::sync::Arc::new(parking_lot::RwLock::new(None)),
48 warming_task: None,
49 };
50
51 if config.enable_cache_warming {
53 cache.start_cache_warming();
54 }
55
56 cache
57 }
58
59 #[must_use]
61 pub fn new_default() -> Self {
62 Self::new(&CacheConfig::default())
63 }
64
65 pub async fn get_tasks<F, Fut>(&self, key: &str, fetcher: F) -> Result<Vec<Task>>
71 where
72 F: FnOnce() -> Fut,
73 Fut: std::future::Future<Output = Result<Vec<Task>>>,
74 {
75 if let Some(mut cached) = self.tasks.get(key).await {
76 if !cached.is_expired() && !cached.is_idle(self.config.tti) {
77 cached.record_access();
78 self.record_hit();
79
80 if cached.access_count > 3 {
82 self.add_to_warming(key.to_string(), cached.warming_priority + 1);
83 }
84
85 self.notify_preloader(key);
86 return Ok(cached.data);
87 }
88 }
89
90 self.record_miss();
91 let data = fetcher().await?;
92
93 let dependencies = Self::create_task_dependencies(&data);
95 let mut cached_data =
96 CachedData::new_with_dependencies(data.clone(), self.config.ttl, dependencies);
97
98 let priority = if key.starts_with("inbox:") {
100 10
101 } else if key.starts_with("today:") {
102 8
103 } else {
104 5
105 };
106 cached_data.update_warming_priority(priority);
107
108 self.tasks.insert(key.to_string(), cached_data).await;
109 self.notify_preloader(key);
110 Ok(data)
111 }
112
113 pub async fn get_projects<F, Fut>(&self, key: &str, fetcher: F) -> Result<Vec<Project>>
119 where
120 F: FnOnce() -> Fut,
121 Fut: std::future::Future<Output = Result<Vec<Project>>>,
122 {
123 if let Some(mut cached) = self.projects.get(key).await {
124 if !cached.is_expired() && !cached.is_idle(self.config.tti) {
125 cached.record_access();
126 self.record_hit();
127
128 if cached.access_count > 3 {
130 self.add_to_warming(key.to_string(), cached.warming_priority + 1);
131 }
132
133 self.notify_preloader(key);
134 return Ok(cached.data);
135 }
136 }
137
138 self.record_miss();
139 let data = fetcher().await?;
140
141 let dependencies = Self::create_project_dependencies(&data);
143 let mut cached_data =
144 CachedData::new_with_dependencies(data.clone(), self.config.ttl, dependencies);
145
146 let priority = if key.starts_with("projects:") { 7 } else { 5 };
148 cached_data.update_warming_priority(priority);
149
150 self.projects.insert(key.to_string(), cached_data).await;
151 self.notify_preloader(key);
152 Ok(data)
153 }
154
155 pub async fn get_areas<F, Fut>(&self, key: &str, fetcher: F) -> Result<Vec<Area>>
161 where
162 F: FnOnce() -> Fut,
163 Fut: std::future::Future<Output = Result<Vec<Area>>>,
164 {
165 if let Some(mut cached) = self.areas.get(key).await {
166 if !cached.is_expired() && !cached.is_idle(self.config.tti) {
167 cached.record_access();
168 self.record_hit();
169
170 if cached.access_count > 3 {
172 self.add_to_warming(key.to_string(), cached.warming_priority + 1);
173 }
174
175 self.notify_preloader(key);
176 return Ok(cached.data);
177 }
178 }
179
180 self.record_miss();
181 let data = fetcher().await?;
182
183 let dependencies = Self::create_area_dependencies(&data);
185 let mut cached_data =
186 CachedData::new_with_dependencies(data.clone(), self.config.ttl, dependencies);
187
188 let priority = if key.starts_with("areas:") { 6 } else { 5 };
190 cached_data.update_warming_priority(priority);
191
192 self.areas.insert(key.to_string(), cached_data).await;
193 self.notify_preloader(key);
194 Ok(data)
195 }
196
197 pub async fn get_search_results<F, Fut>(&self, key: &str, fetcher: F) -> Result<Vec<Task>>
203 where
204 F: FnOnce() -> Fut,
205 Fut: std::future::Future<Output = Result<Vec<Task>>>,
206 {
207 if let Some(mut cached) = self.search_results.get(key).await {
208 if !cached.is_expired() && !cached.is_idle(self.config.tti) {
209 cached.record_access();
210 self.record_hit();
211
212 if cached.access_count > 3 {
214 self.add_to_warming(key.to_string(), cached.warming_priority + 1);
215 }
216
217 self.notify_preloader(key);
218 return Ok(cached.data);
219 }
220 }
221
222 self.record_miss();
223 let data = fetcher().await?;
224
225 let dependencies = Self::create_task_dependencies(&data);
227 let mut cached_data =
228 CachedData::new_with_dependencies(data.clone(), self.config.ttl, dependencies);
229
230 let priority = if key.starts_with("search:") { 4 } else { 3 };
232 cached_data.update_warming_priority(priority);
233
234 self.search_results
235 .insert(key.to_string(), cached_data)
236 .await;
237 self.notify_preloader(key);
238 Ok(data)
239 }
240
241 pub fn invalidate_all(&self) {
243 self.tasks.invalidate_all();
244 self.projects.invalidate_all();
245 self.areas.invalidate_all();
246 self.search_results.invalidate_all();
247 }
248
249 pub async fn invalidate(&self, key: &str) {
251 self.tasks.remove(key).await;
252 self.projects.remove(key).await;
253 self.areas.remove(key).await;
254 self.search_results.remove(key).await;
255 }
256
257 #[must_use]
259 pub fn get_stats(&self) -> CacheStats {
260 let mut stats = self.stats.read().clone();
261 stats.entries = self.tasks.entry_count()
262 + self.projects.entry_count()
263 + self.areas.entry_count()
264 + self.search_results.entry_count();
265 stats.calculate_hit_rate();
266 stats
267 }
268
269 pub fn reset_stats(&self) {
271 let mut stats = self.stats.write();
272 *stats = CacheStats::default();
273 }
274
275 fn record_hit(&self) {
277 let mut stats = self.stats.write();
278 stats.hits += 1;
279 }
280
281 fn record_miss(&self) {
283 let mut stats = self.stats.write();
284 stats.misses += 1;
285 }
286
287 fn create_task_dependencies(tasks: &[Task]) -> Vec<CacheDependency> {
289 let mut dependencies = Vec::new();
290
291 for task in tasks {
293 dependencies.push(CacheDependency {
294 entity_type: "task".to_string(),
295 entity_id: Some(task.uuid.clone()),
296 invalidating_operations: vec![
297 "task_updated".to_string(),
298 "task_deleted".to_string(),
299 "task_completed".to_string(),
300 ],
301 });
302
303 if let Some(project_uuid) = &task.project_uuid {
305 dependencies.push(CacheDependency {
306 entity_type: "project".to_string(),
307 entity_id: Some(project_uuid.clone()),
308 invalidating_operations: vec![
309 "project_updated".to_string(),
310 "project_deleted".to_string(),
311 ],
312 });
313 }
314
315 if let Some(area_uuid) = &task.area_uuid {
317 dependencies.push(CacheDependency {
318 entity_type: "area".to_string(),
319 entity_id: Some(area_uuid.clone()),
320 invalidating_operations: vec![
321 "area_updated".to_string(),
322 "area_deleted".to_string(),
323 ],
324 });
325 }
326 }
327
328 dependencies
329 }
330
331 fn create_project_dependencies(projects: &[Project]) -> Vec<CacheDependency> {
333 let mut dependencies = Vec::new();
334
335 for project in projects {
336 dependencies.push(CacheDependency {
337 entity_type: "project".to_string(),
338 entity_id: Some(project.uuid.clone()),
339 invalidating_operations: vec![
340 "project_updated".to_string(),
341 "project_deleted".to_string(),
342 ],
343 });
344
345 if let Some(area_uuid) = &project.area_uuid {
346 dependencies.push(CacheDependency {
347 entity_type: "area".to_string(),
348 entity_id: Some(area_uuid.clone()),
349 invalidating_operations: vec![
350 "area_updated".to_string(),
351 "area_deleted".to_string(),
352 ],
353 });
354 }
355 }
356
357 dependencies
358 }
359
360 fn create_area_dependencies(areas: &[Area]) -> Vec<CacheDependency> {
362 let mut dependencies = Vec::new();
363
364 for area in areas {
365 dependencies.push(CacheDependency {
366 entity_type: "area".to_string(),
367 entity_id: Some(area.uuid.clone()),
368 invalidating_operations: vec![
369 "area_updated".to_string(),
370 "area_deleted".to_string(),
371 ],
372 });
373 }
374
375 dependencies
376 }
377
378 pub(super) fn start_cache_warming(&mut self) {
385 let warming_entries = std::sync::Arc::clone(&self.warming_entries);
386 let preloader = std::sync::Arc::clone(&self.preloader);
387 let stats = std::sync::Arc::clone(&self.stats);
388 let warming_interval = self.config.warming_interval;
389 let max_entries = self.config.max_warming_entries;
390
391 let handle = tokio::spawn(async move {
392 let mut interval = tokio::time::interval(warming_interval);
393 loop {
394 interval.tick().await;
395
396 let entries_to_warm = {
397 let entries = warming_entries.read();
398 let mut sorted_entries: Vec<_> = entries.iter().collect();
399 sorted_entries.sort_by(|a, b| b.1.cmp(a.1));
400 sorted_entries
401 .into_iter()
402 .take(max_entries)
403 .map(|(key, _)| key.clone())
404 .collect::<Vec<_>>()
405 };
406
407 if entries_to_warm.is_empty() {
408 continue;
409 }
410
411 let p_snapshot = preloader.read().clone();
412 if let Some(p) = p_snapshot {
413 for key in &entries_to_warm {
414 p.warm(key);
415 }
416 let mut s = stats.write();
417 s.warming_runs += 1;
418 s.warmed_keys += entries_to_warm.len() as u64;
419 } else {
420 tracing::debug!(
421 "Cache warming {} entries (no preloader registered)",
422 entries_to_warm.len()
423 );
424 }
425
426 let mut entries = warming_entries.write();
427 for key in &entries_to_warm {
428 entries.remove(key);
429 }
430 }
431 });
432
433 self.warming_task = Some(handle);
434 }
435
436 pub fn set_preloader(&self, preloader: std::sync::Arc<dyn CachePreloader>) {
441 *self.preloader.write() = Some(preloader);
442 }
443
444 pub fn clear_preloader(&self) {
447 *self.preloader.write() = None;
448 }
449
450 fn contains_cached_key(&self, key: &str) -> bool {
452 self.tasks.contains_key(key)
453 || self.projects.contains_key(key)
454 || self.areas.contains_key(key)
455 || self.search_results.contains_key(key)
456 }
457
458 fn notify_preloader(&self, accessed_key: &str) {
464 let p_snapshot = self.preloader.read().clone();
465 let Some(p) = p_snapshot else { return };
466 for (k, prio) in p.predict(accessed_key) {
467 if !self.contains_cached_key(&k) {
468 self.add_to_warming(k, prio);
469 }
470 }
471 }
472
473 pub fn add_to_warming(&self, key: String, priority: u32) {
475 let mut entries = self.warming_entries.write();
476 entries.insert(key, priority);
477 }
478
479 pub fn remove_from_warming(&self, key: &str) {
481 let mut entries = self.warming_entries.write();
482 entries.remove(key);
483 }
484
485 pub async fn invalidate_by_entity(
493 &self,
494 entity_type: &str,
495 entity_id: Option<&ThingsId>,
496 ) -> usize {
497 let (task_keys, project_keys, area_keys, search_keys) = {
498 let pred = |dep: &CacheDependency| dep.matches(entity_type, entity_id);
499 (
500 collect_matching_keys(&self.tasks, &pred),
501 collect_matching_keys(&self.projects, &pred),
502 collect_matching_keys(&self.areas, &pred),
503 collect_matching_keys(&self.search_results, &pred),
504 )
505 };
506 let removed = evict_keys(&self.tasks, &task_keys).await
507 + evict_keys(&self.projects, &project_keys).await
508 + evict_keys(&self.areas, &area_keys).await
509 + evict_keys(&self.search_results, &search_keys).await;
510
511 tracing::debug!(
512 "Invalidated {} cache entries depending on {} {:?}",
513 removed,
514 entity_type,
515 entity_id
516 );
517 removed
518 }
519
520 pub async fn invalidate_by_operation(&self, operation: &str) -> usize {
524 let (task_keys, project_keys, area_keys, search_keys) = {
525 let pred = |dep: &CacheDependency| dep.matches_operation(operation);
526 (
527 collect_matching_keys(&self.tasks, &pred),
528 collect_matching_keys(&self.projects, &pred),
529 collect_matching_keys(&self.areas, &pred),
530 collect_matching_keys(&self.search_results, &pred),
531 )
532 };
533 let removed = evict_keys(&self.tasks, &task_keys).await
534 + evict_keys(&self.projects, &project_keys).await
535 + evict_keys(&self.areas, &area_keys).await
536 + evict_keys(&self.search_results, &search_keys).await;
537
538 tracing::debug!(
539 "Invalidated {} cache entries due to operation {}",
540 removed,
541 operation
542 );
543 removed
544 }
545
546 #[must_use]
548 pub fn get_warming_stats(&self) -> (usize, u32) {
549 let entries = self.warming_entries.read();
550 let count = entries.len();
551 let max_priority = entries.values().max().copied().unwrap_or(0);
552 (count, max_priority)
553 }
554
555 pub fn stop_cache_warming(&mut self) {
557 if let Some(handle) = self.warming_task.take() {
558 handle.abort();
559 }
560 }
561}
562
563impl Default for ThingsCache {
564 fn default() -> Self {
565 Self::new_default()
566 }
567}
568
569fn collect_matching_keys<V>(
573 cache: &moka::future::Cache<String, CachedData<V>>,
574 pred: &dyn Fn(&CacheDependency) -> bool,
575) -> Vec<String>
576where
577 V: Clone + Send + Sync + 'static,
578{
579 cache
580 .iter()
581 .filter_map(|(k, v)| {
582 if v.dependencies.iter().any(pred) {
583 Some((*k).clone())
584 } else {
585 None
586 }
587 })
588 .collect()
589}
590
591async fn evict_keys<V>(cache: &moka::future::Cache<String, CachedData<V>>, keys: &[String]) -> usize
597where
598 V: Clone + Send + Sync + 'static,
599{
600 for k in keys {
601 cache.invalidate(k).await;
602 }
603 keys.len()
604}