Skip to main content

things3_core/cache/
operations.rs

1use crate::models::{Area, Project, Task, ThingsId};
2use anyhow::Result;
3use moka::future::Cache;
4
5use super::config::{CacheConfig, CacheDependency};
6use super::stats::{CachePreloader, CacheStats, CachedData};
7use super::ThingsCache;
8
9impl ThingsCache {
10    /// Create a new cache with the given configuration
11    #[must_use]
12    pub fn new(config: &CacheConfig) -> Self {
13        let tasks = Cache::builder()
14            .max_capacity(config.max_capacity)
15            .time_to_live(config.ttl)
16            .time_to_idle(config.tti)
17            .build();
18
19        let projects = Cache::builder()
20            .max_capacity(config.max_capacity)
21            .time_to_live(config.ttl)
22            .time_to_idle(config.tti)
23            .build();
24
25        let areas = Cache::builder()
26            .max_capacity(config.max_capacity)
27            .time_to_live(config.ttl)
28            .time_to_idle(config.tti)
29            .build();
30
31        let search_results = Cache::builder()
32            .max_capacity(config.max_capacity)
33            .time_to_live(config.ttl)
34            .time_to_idle(config.tti)
35            .build();
36
37        let mut cache = Self {
38            tasks,
39            projects,
40            areas,
41            search_results,
42            stats: std::sync::Arc::new(parking_lot::RwLock::new(CacheStats::default())),
43            config: config.clone(),
44            warming_entries: std::sync::Arc::new(parking_lot::RwLock::new(
45                std::collections::HashMap::new(),
46            )),
47            preloader: std::sync::Arc::new(parking_lot::RwLock::new(None)),
48            warming_task: None,
49        };
50
51        // Start cache warming task if enabled
52        if config.enable_cache_warming {
53            cache.start_cache_warming();
54        }
55
56        cache
57    }
58
59    /// Create a new cache with default configuration
60    #[must_use]
61    pub fn new_default() -> Self {
62        Self::new(&CacheConfig::default())
63    }
64
65    /// Get tasks from cache or fetch if not cached
66    ///
67    /// # Errors
68    ///
69    /// Returns an error if the fetcher function fails.
70    pub async fn get_tasks<F, Fut>(&self, key: &str, fetcher: F) -> Result<Vec<Task>>
71    where
72        F: FnOnce() -> Fut,
73        Fut: std::future::Future<Output = Result<Vec<Task>>>,
74    {
75        if let Some(mut cached) = self.tasks.get(key).await {
76            if !cached.is_expired() && !cached.is_idle(self.config.tti) {
77                cached.record_access();
78                self.record_hit();
79
80                // Add to warming if frequently accessed
81                if cached.access_count > 3 {
82                    self.add_to_warming(key.to_string(), cached.warming_priority + 1);
83                }
84
85                self.notify_preloader(key);
86                return Ok(cached.data);
87            }
88        }
89
90        self.record_miss();
91        let data = fetcher().await?;
92
93        // Create dependencies for intelligent invalidation
94        let dependencies = Self::create_task_dependencies(&data);
95        let mut cached_data =
96            CachedData::new_with_dependencies(data.clone(), self.config.ttl, dependencies);
97
98        // Set initial warming priority based on key type
99        let priority = if key.starts_with("inbox:") {
100            10
101        } else if key.starts_with("today:") {
102            8
103        } else {
104            5
105        };
106        cached_data.update_warming_priority(priority);
107
108        self.tasks.insert(key.to_string(), cached_data).await;
109        self.notify_preloader(key);
110        Ok(data)
111    }
112
113    /// Get projects from cache or fetch if not cached
114    ///
115    /// # Errors
116    ///
117    /// Returns an error if the fetcher function fails.
118    pub async fn get_projects<F, Fut>(&self, key: &str, fetcher: F) -> Result<Vec<Project>>
119    where
120        F: FnOnce() -> Fut,
121        Fut: std::future::Future<Output = Result<Vec<Project>>>,
122    {
123        if let Some(mut cached) = self.projects.get(key).await {
124            if !cached.is_expired() && !cached.is_idle(self.config.tti) {
125                cached.record_access();
126                self.record_hit();
127
128                // Add to warming if frequently accessed
129                if cached.access_count > 3 {
130                    self.add_to_warming(key.to_string(), cached.warming_priority + 1);
131                }
132
133                self.notify_preloader(key);
134                return Ok(cached.data);
135            }
136        }
137
138        self.record_miss();
139        let data = fetcher().await?;
140
141        // Create dependencies for intelligent invalidation
142        let dependencies = Self::create_project_dependencies(&data);
143        let mut cached_data =
144            CachedData::new_with_dependencies(data.clone(), self.config.ttl, dependencies);
145
146        // Set initial warming priority
147        let priority = if key.starts_with("projects:") { 7 } else { 5 };
148        cached_data.update_warming_priority(priority);
149
150        self.projects.insert(key.to_string(), cached_data).await;
151        self.notify_preloader(key);
152        Ok(data)
153    }
154
155    /// Get areas from cache or fetch if not cached
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the fetcher function fails.
160    pub async fn get_areas<F, Fut>(&self, key: &str, fetcher: F) -> Result<Vec<Area>>
161    where
162        F: FnOnce() -> Fut,
163        Fut: std::future::Future<Output = Result<Vec<Area>>>,
164    {
165        if let Some(mut cached) = self.areas.get(key).await {
166            if !cached.is_expired() && !cached.is_idle(self.config.tti) {
167                cached.record_access();
168                self.record_hit();
169
170                // Add to warming if frequently accessed
171                if cached.access_count > 3 {
172                    self.add_to_warming(key.to_string(), cached.warming_priority + 1);
173                }
174
175                self.notify_preloader(key);
176                return Ok(cached.data);
177            }
178        }
179
180        self.record_miss();
181        let data = fetcher().await?;
182
183        // Create dependencies for intelligent invalidation
184        let dependencies = Self::create_area_dependencies(&data);
185        let mut cached_data =
186            CachedData::new_with_dependencies(data.clone(), self.config.ttl, dependencies);
187
188        // Set initial warming priority
189        let priority = if key.starts_with("areas:") { 6 } else { 5 };
190        cached_data.update_warming_priority(priority);
191
192        self.areas.insert(key.to_string(), cached_data).await;
193        self.notify_preloader(key);
194        Ok(data)
195    }
196
197    /// Get search results from cache or fetch if not cached
198    ///
199    /// # Errors
200    ///
201    /// Returns an error if the fetcher function fails.
202    pub async fn get_search_results<F, Fut>(&self, key: &str, fetcher: F) -> Result<Vec<Task>>
203    where
204        F: FnOnce() -> Fut,
205        Fut: std::future::Future<Output = Result<Vec<Task>>>,
206    {
207        if let Some(mut cached) = self.search_results.get(key).await {
208            if !cached.is_expired() && !cached.is_idle(self.config.tti) {
209                cached.record_access();
210                self.record_hit();
211
212                // Add to warming if frequently accessed
213                if cached.access_count > 3 {
214                    self.add_to_warming(key.to_string(), cached.warming_priority + 1);
215                }
216
217                self.notify_preloader(key);
218                return Ok(cached.data);
219            }
220        }
221
222        self.record_miss();
223        let data = fetcher().await?;
224
225        // Create dependencies for intelligent invalidation
226        let dependencies = Self::create_task_dependencies(&data);
227        let mut cached_data =
228            CachedData::new_with_dependencies(data.clone(), self.config.ttl, dependencies);
229
230        // Set initial warming priority for search results
231        let priority = if key.starts_with("search:") { 4 } else { 3 };
232        cached_data.update_warming_priority(priority);
233
234        self.search_results
235            .insert(key.to_string(), cached_data)
236            .await;
237        self.notify_preloader(key);
238        Ok(data)
239    }
240
241    /// Invalidate all caches
242    pub fn invalidate_all(&self) {
243        self.tasks.invalidate_all();
244        self.projects.invalidate_all();
245        self.areas.invalidate_all();
246        self.search_results.invalidate_all();
247    }
248
249    /// Invalidate specific cache entry
250    pub async fn invalidate(&self, key: &str) {
251        self.tasks.remove(key).await;
252        self.projects.remove(key).await;
253        self.areas.remove(key).await;
254        self.search_results.remove(key).await;
255    }
256
257    /// Get cache statistics
258    #[must_use]
259    pub fn get_stats(&self) -> CacheStats {
260        let mut stats = self.stats.read().clone();
261        stats.entries = self.tasks.entry_count()
262            + self.projects.entry_count()
263            + self.areas.entry_count()
264            + self.search_results.entry_count();
265        stats.calculate_hit_rate();
266        stats
267    }
268
269    /// Reset cache statistics
270    pub fn reset_stats(&self) {
271        let mut stats = self.stats.write();
272        *stats = CacheStats::default();
273    }
274
275    /// Record a cache hit
276    fn record_hit(&self) {
277        let mut stats = self.stats.write();
278        stats.hits += 1;
279    }
280
281    /// Record a cache miss
282    fn record_miss(&self) {
283        let mut stats = self.stats.write();
284        stats.misses += 1;
285    }
286
287    /// Create dependencies for task data
288    fn create_task_dependencies(tasks: &[Task]) -> Vec<CacheDependency> {
289        let mut dependencies = Vec::new();
290
291        // Add dependencies for each task
292        for task in tasks {
293            dependencies.push(CacheDependency {
294                entity_type: "task".to_string(),
295                entity_id: Some(task.uuid.clone()),
296                invalidating_operations: vec![
297                    "task_updated".to_string(),
298                    "task_deleted".to_string(),
299                    "task_completed".to_string(),
300                ],
301            });
302
303            // Add project dependency if task belongs to a project
304            if let Some(project_uuid) = &task.project_uuid {
305                dependencies.push(CacheDependency {
306                    entity_type: "project".to_string(),
307                    entity_id: Some(project_uuid.clone()),
308                    invalidating_operations: vec![
309                        "project_updated".to_string(),
310                        "project_deleted".to_string(),
311                    ],
312                });
313            }
314
315            // Add area dependency if task belongs to an area
316            if let Some(area_uuid) = &task.area_uuid {
317                dependencies.push(CacheDependency {
318                    entity_type: "area".to_string(),
319                    entity_id: Some(area_uuid.clone()),
320                    invalidating_operations: vec![
321                        "area_updated".to_string(),
322                        "area_deleted".to_string(),
323                    ],
324                });
325            }
326        }
327
328        dependencies
329    }
330
331    /// Create dependencies for project data
332    fn create_project_dependencies(projects: &[Project]) -> Vec<CacheDependency> {
333        let mut dependencies = Vec::new();
334
335        for project in projects {
336            dependencies.push(CacheDependency {
337                entity_type: "project".to_string(),
338                entity_id: Some(project.uuid.clone()),
339                invalidating_operations: vec![
340                    "project_updated".to_string(),
341                    "project_deleted".to_string(),
342                ],
343            });
344
345            if let Some(area_uuid) = &project.area_uuid {
346                dependencies.push(CacheDependency {
347                    entity_type: "area".to_string(),
348                    entity_id: Some(area_uuid.clone()),
349                    invalidating_operations: vec![
350                        "area_updated".to_string(),
351                        "area_deleted".to_string(),
352                    ],
353                });
354            }
355        }
356
357        dependencies
358    }
359
360    /// Create dependencies for area data
361    fn create_area_dependencies(areas: &[Area]) -> Vec<CacheDependency> {
362        let mut dependencies = Vec::new();
363
364        for area in areas {
365            dependencies.push(CacheDependency {
366                entity_type: "area".to_string(),
367                entity_id: Some(area.uuid.clone()),
368                invalidating_operations: vec![
369                    "area_updated".to_string(),
370                    "area_deleted".to_string(),
371                ],
372            });
373        }
374
375        dependencies
376    }
377
378    /// Start cache warming background task.
379    ///
380    /// Each tick, drains the top-priority queued keys and dispatches each to
381    /// the registered [`CachePreloader`] (if any). Keys are removed from the
382    /// queue after dispatch — the preloader's own `predict` calls re-add them
383    /// later if they remain hot.
384    pub(super) fn start_cache_warming(&mut self) {
385        let warming_entries = std::sync::Arc::clone(&self.warming_entries);
386        let preloader = std::sync::Arc::clone(&self.preloader);
387        let stats = std::sync::Arc::clone(&self.stats);
388        let warming_interval = self.config.warming_interval;
389        let max_entries = self.config.max_warming_entries;
390
391        let handle = tokio::spawn(async move {
392            let mut interval = tokio::time::interval(warming_interval);
393            loop {
394                interval.tick().await;
395
396                let entries_to_warm = {
397                    let entries = warming_entries.read();
398                    let mut sorted_entries: Vec<_> = entries.iter().collect();
399                    sorted_entries.sort_by(|a, b| b.1.cmp(a.1));
400                    sorted_entries
401                        .into_iter()
402                        .take(max_entries)
403                        .map(|(key, _)| key.clone())
404                        .collect::<Vec<_>>()
405                };
406
407                if entries_to_warm.is_empty() {
408                    continue;
409                }
410
411                let p_snapshot = preloader.read().clone();
412                if let Some(p) = p_snapshot {
413                    for key in &entries_to_warm {
414                        p.warm(key);
415                    }
416                    let mut s = stats.write();
417                    s.warming_runs += 1;
418                    s.warmed_keys += entries_to_warm.len() as u64;
419                } else {
420                    tracing::debug!(
421                        "Cache warming {} entries (no preloader registered)",
422                        entries_to_warm.len()
423                    );
424                }
425
426                let mut entries = warming_entries.write();
427                for key in &entries_to_warm {
428                    entries.remove(key);
429                }
430            }
431        });
432
433        self.warming_task = Some(handle);
434    }
435
436    /// Register a preloader. Replaces any previously-registered preloader.
437    ///
438    /// The preloader's `predict` will be invoked after every `get_*` call,
439    /// and `warm` will be invoked by the warming-loop tick for queued keys.
440    pub fn set_preloader(&self, preloader: std::sync::Arc<dyn CachePreloader>) {
441        *self.preloader.write() = Some(preloader);
442    }
443
444    /// Remove the registered preloader. Subsequent `get_*` calls and warming
445    /// ticks become no-ops with respect to predictive preloading.
446    pub fn clear_preloader(&self) {
447        *self.preloader.write() = None;
448    }
449
450    /// Returns `true` if `key` is present in any of the four underlying caches.
451    fn contains_cached_key(&self, key: &str) -> bool {
452        self.tasks.contains_key(key)
453            || self.projects.contains_key(key)
454            || self.areas.contains_key(key)
455            || self.search_results.contains_key(key)
456    }
457
458    /// Snapshot the registered preloader and call its `predict`, pushing any
459    /// returned `(key, priority)` pairs into `warming_entries`.
460    /// Keys already present in the cache are skipped — this prevents a
461    /// self-reinforcing loop where warming a key triggers predict on its
462    /// counterpart, which re-enqueues the original key indefinitely.
463    fn notify_preloader(&self, accessed_key: &str) {
464        let p_snapshot = self.preloader.read().clone();
465        let Some(p) = p_snapshot else { return };
466        for (k, prio) in p.predict(accessed_key) {
467            if !self.contains_cached_key(&k) {
468                self.add_to_warming(k, prio);
469            }
470        }
471    }
472
473    /// Add entry to cache warming list
474    pub fn add_to_warming(&self, key: String, priority: u32) {
475        let mut entries = self.warming_entries.write();
476        entries.insert(key, priority);
477    }
478
479    /// Remove entry from cache warming list
480    pub fn remove_from_warming(&self, key: &str) {
481        let mut entries = self.warming_entries.write();
482        entries.remove(key);
483    }
484
485    /// Selectively invalidate cache entries whose dependencies match
486    /// `(entity_type, entity_id)`. Returns the number of keys submitted for
487    /// eviction (moka eviction may complete asynchronously).
488    ///
489    /// `entity_id == None` is a wildcard that matches any cached entry
490    /// depending on `entity_type`. Entries that do not depend on the mutated
491    /// entity are left untouched.
492    pub async fn invalidate_by_entity(
493        &self,
494        entity_type: &str,
495        entity_id: Option<&ThingsId>,
496    ) -> usize {
497        let (task_keys, project_keys, area_keys, search_keys) = {
498            let pred = |dep: &CacheDependency| dep.matches(entity_type, entity_id);
499            (
500                collect_matching_keys(&self.tasks, &pred),
501                collect_matching_keys(&self.projects, &pred),
502                collect_matching_keys(&self.areas, &pred),
503                collect_matching_keys(&self.search_results, &pred),
504            )
505        };
506        let removed = evict_keys(&self.tasks, &task_keys).await
507            + evict_keys(&self.projects, &project_keys).await
508            + evict_keys(&self.areas, &area_keys).await
509            + evict_keys(&self.search_results, &search_keys).await;
510
511        tracing::debug!(
512            "Invalidated {} cache entries depending on {} {:?}",
513            removed,
514            entity_type,
515            entity_id
516        );
517        removed
518    }
519
520    /// Selectively invalidate cache entries whose dependencies list `operation`
521    /// among their invalidating operations. Returns the number of keys submitted
522    /// for eviction (moka eviction may complete asynchronously).
523    pub async fn invalidate_by_operation(&self, operation: &str) -> usize {
524        let (task_keys, project_keys, area_keys, search_keys) = {
525            let pred = |dep: &CacheDependency| dep.matches_operation(operation);
526            (
527                collect_matching_keys(&self.tasks, &pred),
528                collect_matching_keys(&self.projects, &pred),
529                collect_matching_keys(&self.areas, &pred),
530                collect_matching_keys(&self.search_results, &pred),
531            )
532        };
533        let removed = evict_keys(&self.tasks, &task_keys).await
534            + evict_keys(&self.projects, &project_keys).await
535            + evict_keys(&self.areas, &area_keys).await
536            + evict_keys(&self.search_results, &search_keys).await;
537
538        tracing::debug!(
539            "Invalidated {} cache entries due to operation {}",
540            removed,
541            operation
542        );
543        removed
544    }
545
546    /// Get cache warming statistics
547    #[must_use]
548    pub fn get_warming_stats(&self) -> (usize, u32) {
549        let entries = self.warming_entries.read();
550        let count = entries.len();
551        let max_priority = entries.values().max().copied().unwrap_or(0);
552        (count, max_priority)
553    }
554
555    /// Stop cache warming
556    pub fn stop_cache_warming(&mut self) {
557        if let Some(handle) = self.warming_task.take() {
558            handle.abort();
559        }
560    }
561}
562
563impl Default for ThingsCache {
564    fn default() -> Self {
565        Self::new_default()
566    }
567}
568
569/// Walk a moka cache synchronously and collect keys whose dependency list
570/// satisfies `pred`. Split from [`evict_keys`] so the (non-`Send`) predicate is
571/// dropped before any `.await`, keeping the surrounding async fn `Send`.
572fn collect_matching_keys<V>(
573    cache: &moka::future::Cache<String, CachedData<V>>,
574    pred: &dyn Fn(&CacheDependency) -> bool,
575) -> Vec<String>
576where
577    V: Clone + Send + Sync + 'static,
578{
579    cache
580        .iter()
581        .filter_map(|(k, v)| {
582            if v.dependencies.iter().any(pred) {
583                Some((*k).clone())
584            } else {
585                None
586            }
587        })
588        .collect()
589}
590
591/// Evict the given keys from a moka cache.
592///
593/// Returns the number of keys submitted for eviction. Moka's `invalidate` is
594/// async but the actual removal may lag slightly; callers that need to observe
595/// the post-eviction state should `await` a short yield or sleep.
596async fn evict_keys<V>(cache: &moka::future::Cache<String, CachedData<V>>, keys: &[String]) -> usize
597where
598    V: Clone + Send + Sync + 'static,
599{
600    for k in keys {
601        cache.invalidate(k).await;
602    }
603    keys.len()
604}