omnigraph/
runtime_cache.rs1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use omnigraph_compiler::catalog::Catalog;
5use tokio::sync::Mutex;
6
7use crate::db::ResolvedTarget;
8use crate::error::Result;
9use crate::graph_index::GraphIndex;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12struct GraphIndexCacheKey {
13 snapshot_id: String,
14 edge_tables: Vec<GraphIndexTableState>,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18struct GraphIndexTableState {
19 table_key: String,
20 table_version: u64,
21 table_branch: Option<String>,
22}
23
24#[derive(Debug, Default)]
25pub struct RuntimeCache {
26 graph_indices: Mutex<GraphIndexCache>,
27}
28
29#[derive(Debug, Default)]
30struct GraphIndexCache {
31 entries: HashMap<GraphIndexCacheKey, Arc<GraphIndex>>,
32 lru: VecDeque<GraphIndexCacheKey>,
33}
34
35impl RuntimeCache {
36 pub async fn invalidate_all(&self) {
37 let mut cache = self.graph_indices.lock().await;
38 cache.entries.clear();
39 cache.lru.clear();
40 }
41
42 pub async fn graph_index(
43 &self,
44 resolved: &ResolvedTarget,
45 catalog: &Catalog,
46 ) -> Result<Arc<GraphIndex>> {
47 let key = graph_index_cache_key(resolved, catalog);
48 {
49 let mut cache = self.graph_indices.lock().await;
50 if let Some(index) = cache.entries.get(&key).cloned() {
51 cache.touch(key.clone());
52 return Ok(index);
53 }
54 }
55
56 let edge_types = catalog
57 .edge_types
58 .iter()
59 .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone())))
60 .collect();
61
62 let index = Arc::new(GraphIndex::build(&resolved.snapshot, &edge_types).await?);
63 let mut cache = self.graph_indices.lock().await;
64 if let Some(existing) = cache.entries.get(&key).cloned() {
65 cache.touch(key);
66 return Ok(existing);
67 }
68 cache.insert(key, Arc::clone(&index));
69 Ok(index)
70 }
71}
72
73impl GraphIndexCache {
74 fn insert(&mut self, key: GraphIndexCacheKey, value: Arc<GraphIndex>) {
75 self.entries.insert(key.clone(), value);
76 self.touch(key);
77 while self.entries.len() > 8 {
78 let Some(oldest) = self.lru.pop_front() else {
79 break;
80 };
81 if self.entries.remove(&oldest).is_some() {
82 break;
83 }
84 }
85 }
86
87 fn touch(&mut self, key: GraphIndexCacheKey) {
88 self.lru.retain(|existing| existing != &key);
89 self.lru.push_back(key);
90 }
91}
92
93fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphIndexCacheKey {
94 let mut edge_tables: Vec<GraphIndexTableState> = catalog
95 .edge_types
96 .keys()
97 .filter_map(|edge_name| {
98 let table_key = format!("edge:{}", edge_name);
99 resolved
100 .snapshot
101 .entry(&table_key)
102 .map(|entry| GraphIndexTableState {
103 table_key,
104 table_version: entry.table_version,
105 table_branch: entry.table_branch.clone(),
106 })
107 })
108 .collect();
109 edge_tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
110
111 GraphIndexCacheKey {
112 snapshot_id: resolved.snapshot_id.as_str().to_string(),
113 edge_tables,
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use std::sync::Arc;
120
121 use super::*;
122
123 fn key(id: usize) -> GraphIndexCacheKey {
124 GraphIndexCacheKey {
125 snapshot_id: format!("snap-{id}"),
126 edge_tables: Vec::new(),
127 }
128 }
129
130 fn empty_index() -> Arc<GraphIndex> {
131 Arc::new(GraphIndex::empty_for_test())
132 }
133
134 #[test]
135 fn graph_index_cache_evicts_oldest_entry() {
136 let mut cache = GraphIndexCache::default();
137 for idx in 0..9 {
138 cache.insert(key(idx), empty_index());
139 }
140
141 assert_eq!(cache.entries.len(), 8);
142 assert!(!cache.entries.contains_key(&key(0)));
143 assert!(cache.entries.contains_key(&key(8)));
144 }
145
146 #[test]
147 fn graph_index_cache_touch_keeps_recent_entry() {
148 let mut cache = GraphIndexCache::default();
149 for idx in 0..8 {
150 cache.insert(key(idx), empty_index());
151 }
152
153 cache.touch(key(0));
154 cache.insert(key(8), empty_index());
155
156 assert!(cache.entries.contains_key(&key(0)));
157 assert!(!cache.entries.contains_key(&key(1)));
158 }
159}