oxify_storage/
cache.rs

1//! In-Memory Caching Layer
2//!
3//! Provides high-performance in-memory caching for frequently accessed data.
4//!
5//! ## Overview
6//!
7//! The cache layer reduces database load by storing frequently accessed items in memory:
8//! - **Workflow definitions** - Most frequently accessed, rarely change
9//! - **User quotas** - Hot path for execution checks
10//! - **API keys** - Validated on every request
11//! - **Secrets** - Decrypted values cached after first access
12//!
13//! ## Cache Strategy
14//!
15//! - **LRU Eviction**: Least recently used items evicted when capacity reached
16//! - **TTL Support**: Time-based expiration for cache entries
17//! - **Automatic Invalidation**: Write-through invalidation on updates
18//! - **Warm-up**: Pre-populate cache with critical data on startup
19//!
20//! ## Usage Example
21//!
22//! ```ignore
23//! use oxify_storage::{Cache, CacheConfig};
24//! use std::time::Duration;
25//!
26//! let config = CacheConfig {
27//!     max_size: 1000,
28//!     default_ttl: Duration::from_secs(300), // 5 minutes
29//! };
30//! let cache = Cache::new(config);
31//!
32//! // Store workflow in cache
33//! cache.put_workflow(workflow_id, workflow.clone());
34//!
35//! // Retrieve from cache
36//! if let Some(workflow) = cache.get_workflow(&workflow_id) {
37//!     return Ok(workflow);
38//! }
39//!
40//! // Cache miss - fetch from database
41//! let workflow = fetch_from_db(&workflow_id).await?;
42//! cache.put_workflow(workflow_id, workflow.clone());
43//! ```
44
45use crate::models::WorkflowRow;
46use chrono::{DateTime, Duration, Utc};
47
48// Stub types for disabled quota_store module
49// TODO: Re-enable when quota_store is migrated to SQLite
50#[derive(Debug, Clone)]
51pub struct UserQuota {
52    pub user_id: uuid::Uuid,
53    pub max_executions: i64,
54}
55
56#[derive(Debug, Clone)]
57pub struct WorkflowQuota {
58    pub workflow_id: uuid::Uuid,
59    pub max_executions: i64,
60}
61use serde::{Deserialize, Serialize};
62use std::collections::HashMap;
63use std::sync::{Arc, RwLock};
64use uuid::Uuid;
65
66/// Cache configuration
67#[derive(Debug, Clone)]
68pub struct CacheConfig {
69    /// Maximum number of entries per cache type
70    pub max_size: usize,
71    /// Default TTL for cache entries
72    pub default_ttl: std::time::Duration,
73    /// Enable cache metrics collection
74    pub enable_metrics: bool,
75}
76
77impl Default for CacheConfig {
78    fn default() -> Self {
79        Self {
80            max_size: 1000,
81            default_ttl: std::time::Duration::from_secs(300), // 5 minutes
82            enable_metrics: true,
83        }
84    }
85}
86
87/// Cache entry with expiration
88#[derive(Debug, Clone)]
89struct CacheEntry<T> {
90    value: T,
91    expires_at: DateTime<Utc>,
92    access_count: u64,
93    last_accessed: DateTime<Utc>,
94}
95
96impl<T: Clone> CacheEntry<T> {
97    fn new(value: T, ttl: std::time::Duration) -> Self {
98        let now = Utc::now();
99        let ttl_duration = Duration::from_std(ttl).unwrap_or(Duration::seconds(300));
100        Self {
101            value,
102            expires_at: now + ttl_duration,
103            access_count: 0,
104            last_accessed: now,
105        }
106    }
107
108    fn is_expired(&self) -> bool {
109        Utc::now() > self.expires_at
110    }
111
112    fn access(&mut self) -> T {
113        self.access_count += 1;
114        self.last_accessed = Utc::now();
115        self.value.clone()
116    }
117}
118
119/// LRU cache implementation
120struct LruCache<K: std::hash::Hash + Eq, V: Clone> {
121    entries: HashMap<K, CacheEntry<V>>,
122    max_size: usize,
123}
124
125impl<K: std::hash::Hash + Eq + Clone, V: Clone> LruCache<K, V> {
126    fn new(max_size: usize) -> Self {
127        Self {
128            entries: HashMap::with_capacity(max_size),
129            max_size,
130        }
131    }
132
133    fn get(&mut self, key: &K) -> Option<V> {
134        if let Some(entry) = self.entries.get_mut(key) {
135            if entry.is_expired() {
136                self.entries.remove(key);
137                return None;
138            }
139            Some(entry.access())
140        } else {
141            None
142        }
143    }
144
145    fn put(&mut self, key: K, value: V, ttl: std::time::Duration) {
146        // Evict expired entries first
147        self.evict_expired();
148
149        // If at capacity, evict LRU entry
150        if self.entries.len() >= self.max_size {
151            self.evict_lru();
152        }
153
154        self.entries.insert(key, CacheEntry::new(value, ttl));
155    }
156
157    fn invalidate(&mut self, key: &K) {
158        self.entries.remove(key);
159    }
160
161    fn clear(&mut self) {
162        self.entries.clear();
163    }
164
165    #[allow(dead_code)]
166    fn size(&self) -> usize {
167        self.entries.len()
168    }
169
170    fn evict_expired(&mut self) {
171        let now = Utc::now();
172        self.entries.retain(|_, entry| entry.expires_at > now);
173    }
174
175    fn evict_lru(&mut self) {
176        if let Some(lru_key) = self
177            .entries
178            .iter()
179            .min_by_key(|(_, entry)| entry.last_accessed)
180            .map(|(key, _)| key.clone())
181        {
182            self.entries.remove(&lru_key);
183        }
184    }
185
186    fn stats(&self) -> CacheStats {
187        let now = Utc::now();
188        let valid_entries = self.entries.values().filter(|e| e.expires_at > now).count();
189        let total_accesses = self.entries.values().map(|e| e.access_count).sum::<u64>();
190
191        CacheStats {
192            size: self.entries.len(),
193            valid_entries,
194            expired_entries: self.entries.len() - valid_entries,
195            total_accesses,
196            capacity: self.max_size,
197        }
198    }
199}
200
201/// Cache statistics
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct CacheStats {
204    pub size: usize,
205    pub valid_entries: usize,
206    pub expired_entries: usize,
207    pub total_accesses: u64,
208    pub capacity: usize,
209}
210
211impl CacheStats {
212    pub fn utilization(&self) -> f64 {
213        if self.capacity == 0 {
214            return 0.0;
215        }
216        self.size as f64 / self.capacity as f64
217    }
218
219    pub fn hit_rate(&self, hits: u64, misses: u64) -> f64 {
220        let total = hits + misses;
221        if total == 0 {
222            return 0.0;
223        }
224        hits as f64 / total as f64
225    }
226}
227
228/// Cache metrics for monitoring
229#[derive(Debug, Clone, Default, Serialize, Deserialize)]
230pub struct CacheMetrics {
231    pub workflow_hits: u64,
232    pub workflow_misses: u64,
233    pub user_quota_hits: u64,
234    pub user_quota_misses: u64,
235    pub workflow_quota_hits: u64,
236    pub workflow_quota_misses: u64,
237    pub api_key_hits: u64,
238    pub api_key_misses: u64,
239    pub evictions: u64,
240    pub invalidations: u64,
241}
242
243impl CacheMetrics {
244    pub fn workflow_hit_rate(&self) -> f64 {
245        let total = self.workflow_hits + self.workflow_misses;
246        if total == 0 {
247            return 0.0;
248        }
249        self.workflow_hits as f64 / total as f64
250    }
251
252    pub fn user_quota_hit_rate(&self) -> f64 {
253        let total = self.user_quota_hits + self.user_quota_misses;
254        if total == 0 {
255            return 0.0;
256        }
257        self.user_quota_hits as f64 / total as f64
258    }
259
260    pub fn overall_hit_rate(&self) -> f64 {
261        let total_hits = self.workflow_hits
262            + self.user_quota_hits
263            + self.workflow_quota_hits
264            + self.api_key_hits;
265        let total_misses = self.workflow_misses
266            + self.user_quota_misses
267            + self.workflow_quota_misses
268            + self.api_key_misses;
269        let total = total_hits + total_misses;
270        if total == 0 {
271            return 0.0;
272        }
273        total_hits as f64 / total as f64
274    }
275}
276
277/// Multi-level cache for different data types
278pub struct Cache {
279    workflows: Arc<RwLock<LruCache<Uuid, WorkflowRow>>>,
280    user_quotas: Arc<RwLock<LruCache<Uuid, UserQuota>>>,
281    workflow_quotas: Arc<RwLock<LruCache<Uuid, WorkflowQuota>>>,
282    api_keys: Arc<RwLock<LruCache<String, Vec<u8>>>>, // Cached encrypted API keys
283    config: CacheConfig,
284    metrics: Arc<RwLock<CacheMetrics>>,
285}
286
287impl Cache {
288    /// Create a new cache with the given configuration
289    pub fn new(config: CacheConfig) -> Self {
290        Self {
291            workflows: Arc::new(RwLock::new(LruCache::new(config.max_size))),
292            user_quotas: Arc::new(RwLock::new(LruCache::new(config.max_size))),
293            workflow_quotas: Arc::new(RwLock::new(LruCache::new(config.max_size))),
294            api_keys: Arc::new(RwLock::new(LruCache::new(config.max_size))),
295            config,
296            metrics: Arc::new(RwLock::new(CacheMetrics::default())),
297        }
298    }
299
300    // ==================== Workflow Cache ====================
301
302    /// Get workflow from cache
303    pub fn get_workflow(&self, id: &Uuid) -> Option<WorkflowRow> {
304        let mut cache = self.workflows.write().unwrap();
305        let result = cache.get(id);
306
307        if self.config.enable_metrics {
308            let mut metrics = self.metrics.write().unwrap();
309            if result.is_some() {
310                metrics.workflow_hits += 1;
311            } else {
312                metrics.workflow_misses += 1;
313            }
314        }
315
316        result
317    }
318
319    /// Put workflow into cache
320    pub fn put_workflow(&self, id: Uuid, workflow: WorkflowRow) {
321        let mut cache = self.workflows.write().unwrap();
322        cache.put(id, workflow, self.config.default_ttl);
323    }
324
325    /// Invalidate workflow cache entry
326    pub fn invalidate_workflow(&self, id: &Uuid) {
327        let mut cache = self.workflows.write().unwrap();
328        cache.invalidate(id);
329
330        if self.config.enable_metrics {
331            let mut metrics = self.metrics.write().unwrap();
332            metrics.invalidations += 1;
333        }
334    }
335
336    // ==================== User Quota Cache ====================
337
338    /// Get user quota from cache
339    pub fn get_user_quota(&self, user_id: &Uuid) -> Option<UserQuota> {
340        let mut cache = self.user_quotas.write().unwrap();
341        let result = cache.get(user_id);
342
343        if self.config.enable_metrics {
344            let mut metrics = self.metrics.write().unwrap();
345            if result.is_some() {
346                metrics.user_quota_hits += 1;
347            } else {
348                metrics.user_quota_misses += 1;
349            }
350        }
351
352        result
353    }
354
355    /// Put user quota into cache
356    pub fn put_user_quota(&self, user_id: Uuid, quota: UserQuota) {
357        let mut cache = self.user_quotas.write().unwrap();
358        cache.put(user_id, quota, self.config.default_ttl);
359    }
360
361    /// Invalidate user quota cache entry
362    pub fn invalidate_user_quota(&self, user_id: &Uuid) {
363        let mut cache = self.user_quotas.write().unwrap();
364        cache.invalidate(user_id);
365
366        if self.config.enable_metrics {
367            let mut metrics = self.metrics.write().unwrap();
368            metrics.invalidations += 1;
369        }
370    }
371
372    // ==================== Workflow Quota Cache ====================
373
374    /// Get workflow quota from cache
375    pub fn get_workflow_quota(&self, workflow_id: &Uuid) -> Option<WorkflowQuota> {
376        let mut cache = self.workflow_quotas.write().unwrap();
377        let result = cache.get(workflow_id);
378
379        if self.config.enable_metrics {
380            let mut metrics = self.metrics.write().unwrap();
381            if result.is_some() {
382                metrics.workflow_quota_hits += 1;
383            } else {
384                metrics.workflow_quota_misses += 1;
385            }
386        }
387
388        result
389    }
390
391    /// Put workflow quota into cache
392    pub fn put_workflow_quota(&self, workflow_id: Uuid, quota: WorkflowQuota) {
393        let mut cache = self.workflow_quotas.write().unwrap();
394        cache.put(workflow_id, quota, self.config.default_ttl);
395    }
396
397    /// Invalidate workflow quota cache entry
398    pub fn invalidate_workflow_quota(&self, workflow_id: &Uuid) {
399        let mut cache = self.workflow_quotas.write().unwrap();
400        cache.invalidate(workflow_id);
401
402        if self.config.enable_metrics {
403            let mut metrics = self.metrics.write().unwrap();
404            metrics.invalidations += 1;
405        }
406    }
407
408    // ==================== API Key Cache ====================
409
410    /// Get API key from cache
411    pub fn get_api_key(&self, key_hash: &str) -> Option<Vec<u8>> {
412        let mut cache = self.api_keys.write().unwrap();
413        let result = cache.get(&key_hash.to_string());
414
415        if self.config.enable_metrics {
416            let mut metrics = self.metrics.write().unwrap();
417            if result.is_some() {
418                metrics.api_key_hits += 1;
419            } else {
420                metrics.api_key_misses += 1;
421            }
422        }
423
424        result
425    }
426
427    /// Put API key into cache
428    pub fn put_api_key(&self, key_hash: String, encrypted_key: Vec<u8>) {
429        let mut cache = self.api_keys.write().unwrap();
430        cache.put(key_hash, encrypted_key, self.config.default_ttl);
431    }
432
433    /// Invalidate API key cache entry
434    pub fn invalidate_api_key(&self, key_hash: &str) {
435        let mut cache = self.api_keys.write().unwrap();
436        cache.invalidate(&key_hash.to_string());
437
438        if self.config.enable_metrics {
439            let mut metrics = self.metrics.write().unwrap();
440            metrics.invalidations += 1;
441        }
442    }
443
444    // ==================== Cache Management ====================
445
446    /// Clear all caches
447    pub fn clear_all(&self) {
448        self.workflows.write().unwrap().clear();
449        self.user_quotas.write().unwrap().clear();
450        self.workflow_quotas.write().unwrap().clear();
451        self.api_keys.write().unwrap().clear();
452    }
453
454    /// Evict expired entries from all caches
455    pub fn evict_expired(&self) {
456        self.workflows.write().unwrap().evict_expired();
457        self.user_quotas.write().unwrap().evict_expired();
458        self.workflow_quotas.write().unwrap().evict_expired();
459        self.api_keys.write().unwrap().evict_expired();
460    }
461
462    /// Get cache statistics
463    pub fn stats(&self) -> HashMap<String, CacheStats> {
464        let mut stats = HashMap::new();
465        stats.insert(
466            "workflows".to_string(),
467            self.workflows.read().unwrap().stats(),
468        );
469        stats.insert(
470            "user_quotas".to_string(),
471            self.user_quotas.read().unwrap().stats(),
472        );
473        stats.insert(
474            "workflow_quotas".to_string(),
475            self.workflow_quotas.read().unwrap().stats(),
476        );
477        stats.insert(
478            "api_keys".to_string(),
479            self.api_keys.read().unwrap().stats(),
480        );
481        stats
482    }
483
484    /// Get cache metrics
485    pub fn metrics(&self) -> CacheMetrics {
486        self.metrics.read().unwrap().clone()
487    }
488
489    /// Reset cache metrics
490    pub fn reset_metrics(&self) {
491        let mut metrics = self.metrics.write().unwrap();
492        *metrics = CacheMetrics::default();
493    }
494
495    /// Export metrics in a format suitable for monitoring systems
496    pub fn export_metrics(&self) -> HashMap<String, f64> {
497        let metrics = self.metrics.read().unwrap();
498        let mut export = HashMap::new();
499
500        export.insert("workflow_hits".to_string(), metrics.workflow_hits as f64);
501        export.insert(
502            "workflow_misses".to_string(),
503            metrics.workflow_misses as f64,
504        );
505        export.insert("workflow_hit_rate".to_string(), metrics.workflow_hit_rate());
506
507        export.insert(
508            "user_quota_hits".to_string(),
509            metrics.user_quota_hits as f64,
510        );
511        export.insert(
512            "user_quota_misses".to_string(),
513            metrics.user_quota_misses as f64,
514        );
515        export.insert(
516            "user_quota_hit_rate".to_string(),
517            metrics.user_quota_hit_rate(),
518        );
519
520        export.insert(
521            "workflow_quota_hits".to_string(),
522            metrics.workflow_quota_hits as f64,
523        );
524        export.insert(
525            "workflow_quota_misses".to_string(),
526            metrics.workflow_quota_misses as f64,
527        );
528
529        export.insert("api_key_hits".to_string(), metrics.api_key_hits as f64);
530        export.insert("api_key_misses".to_string(), metrics.api_key_misses as f64);
531
532        export.insert("overall_hit_rate".to_string(), metrics.overall_hit_rate());
533        export.insert("evictions".to_string(), metrics.evictions as f64);
534        export.insert("invalidations".to_string(), metrics.invalidations as f64);
535
536        // Add size metrics
537        let stats = self.stats();
538        for (cache_name, cache_stats) in stats {
539            export.insert(format!("{cache_name}_size"), cache_stats.size as f64);
540            export.insert(
541                format!("{cache_name}_utilization"),
542                cache_stats.utilization(),
543            );
544        }
545
546        export
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553
554    #[test]
555    fn test_cache_basic_operations() {
556        let config = CacheConfig {
557            max_size: 10,
558            default_ttl: std::time::Duration::from_secs(60),
559            enable_metrics: true,
560        };
561        let cache = Cache::new(config);
562
563        let workflow_id = Uuid::new_v4();
564        let workflow = WorkflowRow {
565            id: workflow_id.to_string(),
566            name: "test".to_string(),
567            description: None,
568            definition: serde_json::to_string(&serde_json::json!({})).unwrap(),
569            version: 1,
570            tags: None,
571            created_at: Utc::now().to_rfc3339(),
572            updated_at: Utc::now().to_rfc3339(),
573        };
574
575        // Put workflow in cache
576        cache.put_workflow(workflow_id, workflow.clone());
577
578        // Get workflow from cache (hit)
579        let cached = cache.get_workflow(&workflow_id);
580        assert!(cached.is_some());
581        assert_eq!(cached.unwrap().id, workflow_id.to_string());
582
583        // Check metrics
584        let metrics = cache.metrics();
585        assert_eq!(metrics.workflow_hits, 1);
586        assert_eq!(metrics.workflow_misses, 0);
587
588        // Invalidate workflow
589        cache.invalidate_workflow(&workflow_id);
590
591        // Get workflow after invalidation (miss)
592        let cached = cache.get_workflow(&workflow_id);
593        assert!(cached.is_none());
594
595        let metrics = cache.metrics();
596        assert_eq!(metrics.workflow_hits, 1);
597        assert_eq!(metrics.workflow_misses, 1);
598    }
599
600    #[test]
601    fn test_cache_lru_eviction() {
602        let config = CacheConfig {
603            max_size: 2,
604            default_ttl: std::time::Duration::from_secs(60),
605            enable_metrics: false,
606        };
607        let cache = Cache::new(config);
608
609        let id1 = Uuid::new_v4();
610        let id2 = Uuid::new_v4();
611        let id3 = Uuid::new_v4();
612
613        let workflow1 = WorkflowRow {
614            id: id1.to_string(),
615            name: "test1".to_string(),
616            description: None,
617            definition: serde_json::to_string(&serde_json::json!({})).unwrap(),
618            version: 1,
619            tags: None,
620            created_at: Utc::now().to_rfc3339(),
621            updated_at: Utc::now().to_rfc3339(),
622        };
623
624        let mut workflow2 = workflow1.clone();
625        workflow2.id = id2.to_string();
626        workflow2.name = "test2".to_string();
627
628        let mut workflow3 = workflow1.clone();
629        workflow3.id = id3.to_string();
630        workflow3.name = "test3".to_string();
631
632        // Fill cache to capacity
633        cache.put_workflow(id1, workflow1);
634        cache.put_workflow(id2, workflow2);
635
636        // Access workflow1 to make it recently used
637        cache.get_workflow(&id1);
638
639        // Add workflow3, should evict workflow2 (LRU)
640        cache.put_workflow(id3, workflow3);
641
642        // workflow1 should still be in cache
643        assert!(cache.get_workflow(&id1).is_some());
644
645        // workflow3 should be in cache
646        assert!(cache.get_workflow(&id3).is_some());
647    }
648
649    #[test]
650    fn test_cache_stats() {
651        let config = CacheConfig {
652            max_size: 100,
653            default_ttl: std::time::Duration::from_secs(60),
654            enable_metrics: true,
655        };
656        let cache = Cache::new(config);
657
658        let stats = cache.stats();
659        assert_eq!(stats.get("workflows").unwrap().size, 0);
660        assert_eq!(stats.get("workflows").unwrap().capacity, 100);
661    }
662
663    #[test]
664    fn test_cache_metrics_hit_rate() {
665        let metrics = CacheMetrics {
666            workflow_hits: 80,
667            workflow_misses: 20,
668            user_quota_hits: 90,
669            user_quota_misses: 10,
670            ..Default::default()
671        };
672
673        assert_eq!(metrics.workflow_hit_rate(), 0.8);
674        assert_eq!(metrics.user_quota_hit_rate(), 0.9);
675        assert_eq!(metrics.overall_hit_rate(), 0.85);
676    }
677}