1use crate::models::WorkflowRow;
46use chrono::{DateTime, Duration, Utc};
47
48#[derive(Debug, Clone)]
51pub struct UserQuota {
52 pub user_id: uuid::Uuid,
53 pub max_executions: i64,
54}
55
56#[derive(Debug, Clone)]
57pub struct WorkflowQuota {
58 pub workflow_id: uuid::Uuid,
59 pub max_executions: i64,
60}
61use serde::{Deserialize, Serialize};
62use std::collections::HashMap;
63use std::sync::{Arc, RwLock};
64use uuid::Uuid;
65
66#[derive(Debug, Clone)]
68pub struct CacheConfig {
69 pub max_size: usize,
71 pub default_ttl: std::time::Duration,
73 pub enable_metrics: bool,
75}
76
77impl Default for CacheConfig {
78 fn default() -> Self {
79 Self {
80 max_size: 1000,
81 default_ttl: std::time::Duration::from_secs(300), enable_metrics: true,
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
89struct CacheEntry<T> {
90 value: T,
91 expires_at: DateTime<Utc>,
92 access_count: u64,
93 last_accessed: DateTime<Utc>,
94}
95
96impl<T: Clone> CacheEntry<T> {
97 fn new(value: T, ttl: std::time::Duration) -> Self {
98 let now = Utc::now();
99 let ttl_duration = Duration::from_std(ttl).unwrap_or(Duration::seconds(300));
100 Self {
101 value,
102 expires_at: now + ttl_duration,
103 access_count: 0,
104 last_accessed: now,
105 }
106 }
107
108 fn is_expired(&self) -> bool {
109 Utc::now() > self.expires_at
110 }
111
112 fn access(&mut self) -> T {
113 self.access_count += 1;
114 self.last_accessed = Utc::now();
115 self.value.clone()
116 }
117}
118
119struct LruCache<K: std::hash::Hash + Eq, V: Clone> {
121 entries: HashMap<K, CacheEntry<V>>,
122 max_size: usize,
123}
124
125impl<K: std::hash::Hash + Eq + Clone, V: Clone> LruCache<K, V> {
126 fn new(max_size: usize) -> Self {
127 Self {
128 entries: HashMap::with_capacity(max_size),
129 max_size,
130 }
131 }
132
133 fn get(&mut self, key: &K) -> Option<V> {
134 if let Some(entry) = self.entries.get_mut(key) {
135 if entry.is_expired() {
136 self.entries.remove(key);
137 return None;
138 }
139 Some(entry.access())
140 } else {
141 None
142 }
143 }
144
145 fn put(&mut self, key: K, value: V, ttl: std::time::Duration) {
146 self.evict_expired();
148
149 if self.entries.len() >= self.max_size {
151 self.evict_lru();
152 }
153
154 self.entries.insert(key, CacheEntry::new(value, ttl));
155 }
156
157 fn invalidate(&mut self, key: &K) {
158 self.entries.remove(key);
159 }
160
161 fn clear(&mut self) {
162 self.entries.clear();
163 }
164
165 #[allow(dead_code)]
166 fn size(&self) -> usize {
167 self.entries.len()
168 }
169
170 fn evict_expired(&mut self) {
171 let now = Utc::now();
172 self.entries.retain(|_, entry| entry.expires_at > now);
173 }
174
175 fn evict_lru(&mut self) {
176 if let Some(lru_key) = self
177 .entries
178 .iter()
179 .min_by_key(|(_, entry)| entry.last_accessed)
180 .map(|(key, _)| key.clone())
181 {
182 self.entries.remove(&lru_key);
183 }
184 }
185
186 fn stats(&self) -> CacheStats {
187 let now = Utc::now();
188 let valid_entries = self.entries.values().filter(|e| e.expires_at > now).count();
189 let total_accesses = self.entries.values().map(|e| e.access_count).sum::<u64>();
190
191 CacheStats {
192 size: self.entries.len(),
193 valid_entries,
194 expired_entries: self.entries.len() - valid_entries,
195 total_accesses,
196 capacity: self.max_size,
197 }
198 }
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct CacheStats {
204 pub size: usize,
205 pub valid_entries: usize,
206 pub expired_entries: usize,
207 pub total_accesses: u64,
208 pub capacity: usize,
209}
210
211impl CacheStats {
212 pub fn utilization(&self) -> f64 {
213 if self.capacity == 0 {
214 return 0.0;
215 }
216 self.size as f64 / self.capacity as f64
217 }
218
219 pub fn hit_rate(&self, hits: u64, misses: u64) -> f64 {
220 let total = hits + misses;
221 if total == 0 {
222 return 0.0;
223 }
224 hits as f64 / total as f64
225 }
226}
227
228#[derive(Debug, Clone, Default, Serialize, Deserialize)]
230pub struct CacheMetrics {
231 pub workflow_hits: u64,
232 pub workflow_misses: u64,
233 pub user_quota_hits: u64,
234 pub user_quota_misses: u64,
235 pub workflow_quota_hits: u64,
236 pub workflow_quota_misses: u64,
237 pub api_key_hits: u64,
238 pub api_key_misses: u64,
239 pub evictions: u64,
240 pub invalidations: u64,
241}
242
243impl CacheMetrics {
244 pub fn workflow_hit_rate(&self) -> f64 {
245 let total = self.workflow_hits + self.workflow_misses;
246 if total == 0 {
247 return 0.0;
248 }
249 self.workflow_hits as f64 / total as f64
250 }
251
252 pub fn user_quota_hit_rate(&self) -> f64 {
253 let total = self.user_quota_hits + self.user_quota_misses;
254 if total == 0 {
255 return 0.0;
256 }
257 self.user_quota_hits as f64 / total as f64
258 }
259
260 pub fn overall_hit_rate(&self) -> f64 {
261 let total_hits = self.workflow_hits
262 + self.user_quota_hits
263 + self.workflow_quota_hits
264 + self.api_key_hits;
265 let total_misses = self.workflow_misses
266 + self.user_quota_misses
267 + self.workflow_quota_misses
268 + self.api_key_misses;
269 let total = total_hits + total_misses;
270 if total == 0 {
271 return 0.0;
272 }
273 total_hits as f64 / total as f64
274 }
275}
276
277pub struct Cache {
279 workflows: Arc<RwLock<LruCache<Uuid, WorkflowRow>>>,
280 user_quotas: Arc<RwLock<LruCache<Uuid, UserQuota>>>,
281 workflow_quotas: Arc<RwLock<LruCache<Uuid, WorkflowQuota>>>,
282 api_keys: Arc<RwLock<LruCache<String, Vec<u8>>>>, config: CacheConfig,
284 metrics: Arc<RwLock<CacheMetrics>>,
285}
286
287impl Cache {
288 pub fn new(config: CacheConfig) -> Self {
290 Self {
291 workflows: Arc::new(RwLock::new(LruCache::new(config.max_size))),
292 user_quotas: Arc::new(RwLock::new(LruCache::new(config.max_size))),
293 workflow_quotas: Arc::new(RwLock::new(LruCache::new(config.max_size))),
294 api_keys: Arc::new(RwLock::new(LruCache::new(config.max_size))),
295 config,
296 metrics: Arc::new(RwLock::new(CacheMetrics::default())),
297 }
298 }
299
300 pub fn get_workflow(&self, id: &Uuid) -> Option<WorkflowRow> {
304 let mut cache = self.workflows.write().unwrap();
305 let result = cache.get(id);
306
307 if self.config.enable_metrics {
308 let mut metrics = self.metrics.write().unwrap();
309 if result.is_some() {
310 metrics.workflow_hits += 1;
311 } else {
312 metrics.workflow_misses += 1;
313 }
314 }
315
316 result
317 }
318
319 pub fn put_workflow(&self, id: Uuid, workflow: WorkflowRow) {
321 let mut cache = self.workflows.write().unwrap();
322 cache.put(id, workflow, self.config.default_ttl);
323 }
324
325 pub fn invalidate_workflow(&self, id: &Uuid) {
327 let mut cache = self.workflows.write().unwrap();
328 cache.invalidate(id);
329
330 if self.config.enable_metrics {
331 let mut metrics = self.metrics.write().unwrap();
332 metrics.invalidations += 1;
333 }
334 }
335
336 pub fn get_user_quota(&self, user_id: &Uuid) -> Option<UserQuota> {
340 let mut cache = self.user_quotas.write().unwrap();
341 let result = cache.get(user_id);
342
343 if self.config.enable_metrics {
344 let mut metrics = self.metrics.write().unwrap();
345 if result.is_some() {
346 metrics.user_quota_hits += 1;
347 } else {
348 metrics.user_quota_misses += 1;
349 }
350 }
351
352 result
353 }
354
355 pub fn put_user_quota(&self, user_id: Uuid, quota: UserQuota) {
357 let mut cache = self.user_quotas.write().unwrap();
358 cache.put(user_id, quota, self.config.default_ttl);
359 }
360
361 pub fn invalidate_user_quota(&self, user_id: &Uuid) {
363 let mut cache = self.user_quotas.write().unwrap();
364 cache.invalidate(user_id);
365
366 if self.config.enable_metrics {
367 let mut metrics = self.metrics.write().unwrap();
368 metrics.invalidations += 1;
369 }
370 }
371
372 pub fn get_workflow_quota(&self, workflow_id: &Uuid) -> Option<WorkflowQuota> {
376 let mut cache = self.workflow_quotas.write().unwrap();
377 let result = cache.get(workflow_id);
378
379 if self.config.enable_metrics {
380 let mut metrics = self.metrics.write().unwrap();
381 if result.is_some() {
382 metrics.workflow_quota_hits += 1;
383 } else {
384 metrics.workflow_quota_misses += 1;
385 }
386 }
387
388 result
389 }
390
391 pub fn put_workflow_quota(&self, workflow_id: Uuid, quota: WorkflowQuota) {
393 let mut cache = self.workflow_quotas.write().unwrap();
394 cache.put(workflow_id, quota, self.config.default_ttl);
395 }
396
397 pub fn invalidate_workflow_quota(&self, workflow_id: &Uuid) {
399 let mut cache = self.workflow_quotas.write().unwrap();
400 cache.invalidate(workflow_id);
401
402 if self.config.enable_metrics {
403 let mut metrics = self.metrics.write().unwrap();
404 metrics.invalidations += 1;
405 }
406 }
407
408 pub fn get_api_key(&self, key_hash: &str) -> Option<Vec<u8>> {
412 let mut cache = self.api_keys.write().unwrap();
413 let result = cache.get(&key_hash.to_string());
414
415 if self.config.enable_metrics {
416 let mut metrics = self.metrics.write().unwrap();
417 if result.is_some() {
418 metrics.api_key_hits += 1;
419 } else {
420 metrics.api_key_misses += 1;
421 }
422 }
423
424 result
425 }
426
427 pub fn put_api_key(&self, key_hash: String, encrypted_key: Vec<u8>) {
429 let mut cache = self.api_keys.write().unwrap();
430 cache.put(key_hash, encrypted_key, self.config.default_ttl);
431 }
432
433 pub fn invalidate_api_key(&self, key_hash: &str) {
435 let mut cache = self.api_keys.write().unwrap();
436 cache.invalidate(&key_hash.to_string());
437
438 if self.config.enable_metrics {
439 let mut metrics = self.metrics.write().unwrap();
440 metrics.invalidations += 1;
441 }
442 }
443
444 pub fn clear_all(&self) {
448 self.workflows.write().unwrap().clear();
449 self.user_quotas.write().unwrap().clear();
450 self.workflow_quotas.write().unwrap().clear();
451 self.api_keys.write().unwrap().clear();
452 }
453
454 pub fn evict_expired(&self) {
456 self.workflows.write().unwrap().evict_expired();
457 self.user_quotas.write().unwrap().evict_expired();
458 self.workflow_quotas.write().unwrap().evict_expired();
459 self.api_keys.write().unwrap().evict_expired();
460 }
461
462 pub fn stats(&self) -> HashMap<String, CacheStats> {
464 let mut stats = HashMap::new();
465 stats.insert(
466 "workflows".to_string(),
467 self.workflows.read().unwrap().stats(),
468 );
469 stats.insert(
470 "user_quotas".to_string(),
471 self.user_quotas.read().unwrap().stats(),
472 );
473 stats.insert(
474 "workflow_quotas".to_string(),
475 self.workflow_quotas.read().unwrap().stats(),
476 );
477 stats.insert(
478 "api_keys".to_string(),
479 self.api_keys.read().unwrap().stats(),
480 );
481 stats
482 }
483
484 pub fn metrics(&self) -> CacheMetrics {
486 self.metrics.read().unwrap().clone()
487 }
488
489 pub fn reset_metrics(&self) {
491 let mut metrics = self.metrics.write().unwrap();
492 *metrics = CacheMetrics::default();
493 }
494
495 pub fn export_metrics(&self) -> HashMap<String, f64> {
497 let metrics = self.metrics.read().unwrap();
498 let mut export = HashMap::new();
499
500 export.insert("workflow_hits".to_string(), metrics.workflow_hits as f64);
501 export.insert(
502 "workflow_misses".to_string(),
503 metrics.workflow_misses as f64,
504 );
505 export.insert("workflow_hit_rate".to_string(), metrics.workflow_hit_rate());
506
507 export.insert(
508 "user_quota_hits".to_string(),
509 metrics.user_quota_hits as f64,
510 );
511 export.insert(
512 "user_quota_misses".to_string(),
513 metrics.user_quota_misses as f64,
514 );
515 export.insert(
516 "user_quota_hit_rate".to_string(),
517 metrics.user_quota_hit_rate(),
518 );
519
520 export.insert(
521 "workflow_quota_hits".to_string(),
522 metrics.workflow_quota_hits as f64,
523 );
524 export.insert(
525 "workflow_quota_misses".to_string(),
526 metrics.workflow_quota_misses as f64,
527 );
528
529 export.insert("api_key_hits".to_string(), metrics.api_key_hits as f64);
530 export.insert("api_key_misses".to_string(), metrics.api_key_misses as f64);
531
532 export.insert("overall_hit_rate".to_string(), metrics.overall_hit_rate());
533 export.insert("evictions".to_string(), metrics.evictions as f64);
534 export.insert("invalidations".to_string(), metrics.invalidations as f64);
535
536 let stats = self.stats();
538 for (cache_name, cache_stats) in stats {
539 export.insert(format!("{cache_name}_size"), cache_stats.size as f64);
540 export.insert(
541 format!("{cache_name}_utilization"),
542 cache_stats.utilization(),
543 );
544 }
545
546 export
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553
554 #[test]
555 fn test_cache_basic_operations() {
556 let config = CacheConfig {
557 max_size: 10,
558 default_ttl: std::time::Duration::from_secs(60),
559 enable_metrics: true,
560 };
561 let cache = Cache::new(config);
562
563 let workflow_id = Uuid::new_v4();
564 let workflow = WorkflowRow {
565 id: workflow_id.to_string(),
566 name: "test".to_string(),
567 description: None,
568 definition: serde_json::to_string(&serde_json::json!({})).unwrap(),
569 version: 1,
570 tags: None,
571 created_at: Utc::now().to_rfc3339(),
572 updated_at: Utc::now().to_rfc3339(),
573 };
574
575 cache.put_workflow(workflow_id, workflow.clone());
577
578 let cached = cache.get_workflow(&workflow_id);
580 assert!(cached.is_some());
581 assert_eq!(cached.unwrap().id, workflow_id.to_string());
582
583 let metrics = cache.metrics();
585 assert_eq!(metrics.workflow_hits, 1);
586 assert_eq!(metrics.workflow_misses, 0);
587
588 cache.invalidate_workflow(&workflow_id);
590
591 let cached = cache.get_workflow(&workflow_id);
593 assert!(cached.is_none());
594
595 let metrics = cache.metrics();
596 assert_eq!(metrics.workflow_hits, 1);
597 assert_eq!(metrics.workflow_misses, 1);
598 }
599
600 #[test]
601 fn test_cache_lru_eviction() {
602 let config = CacheConfig {
603 max_size: 2,
604 default_ttl: std::time::Duration::from_secs(60),
605 enable_metrics: false,
606 };
607 let cache = Cache::new(config);
608
609 let id1 = Uuid::new_v4();
610 let id2 = Uuid::new_v4();
611 let id3 = Uuid::new_v4();
612
613 let workflow1 = WorkflowRow {
614 id: id1.to_string(),
615 name: "test1".to_string(),
616 description: None,
617 definition: serde_json::to_string(&serde_json::json!({})).unwrap(),
618 version: 1,
619 tags: None,
620 created_at: Utc::now().to_rfc3339(),
621 updated_at: Utc::now().to_rfc3339(),
622 };
623
624 let mut workflow2 = workflow1.clone();
625 workflow2.id = id2.to_string();
626 workflow2.name = "test2".to_string();
627
628 let mut workflow3 = workflow1.clone();
629 workflow3.id = id3.to_string();
630 workflow3.name = "test3".to_string();
631
632 cache.put_workflow(id1, workflow1);
634 cache.put_workflow(id2, workflow2);
635
636 cache.get_workflow(&id1);
638
639 cache.put_workflow(id3, workflow3);
641
642 assert!(cache.get_workflow(&id1).is_some());
644
645 assert!(cache.get_workflow(&id3).is_some());
647 }
648
649 #[test]
650 fn test_cache_stats() {
651 let config = CacheConfig {
652 max_size: 100,
653 default_ttl: std::time::Duration::from_secs(60),
654 enable_metrics: true,
655 };
656 let cache = Cache::new(config);
657
658 let stats = cache.stats();
659 assert_eq!(stats.get("workflows").unwrap().size, 0);
660 assert_eq!(stats.get("workflows").unwrap().capacity, 100);
661 }
662
663 #[test]
664 fn test_cache_metrics_hit_rate() {
665 let metrics = CacheMetrics {
666 workflow_hits: 80,
667 workflow_misses: 20,
668 user_quota_hits: 90,
669 user_quota_misses: 10,
670 ..Default::default()
671 };
672
673 assert_eq!(metrics.workflow_hit_rate(), 0.8);
674 assert_eq!(metrics.user_quota_hit_rate(), 0.9);
675 assert_eq!(metrics.overall_hit_rate(), 0.85);
676 }
677}