1use std::collections::{HashMap, VecDeque};
2use std::hash::Hash;
3use std::sync::Arc;
4
5use lance::Dataset;
6use lance::session::Session;
7use omnigraph_compiler::catalog::Catalog;
8use tokio::sync::Mutex;
9
10use crate::db::ResolvedTarget;
11use crate::error::Result;
12use crate::graph_index::GraphIndex;
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash)]
15struct GraphIndexCacheKey {
16 snapshot_id: String,
17 edge_tables: Vec<GraphIndexTableState>,
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21struct GraphIndexTableState {
22 table_key: String,
23 table_version: u64,
24 table_branch: Option<String>,
25}
26
27#[derive(Debug, Default)]
28pub struct RuntimeCache {
29 graph_indices: Mutex<GraphIndexCache>,
30}
31
32#[derive(Debug)]
33struct GraphIndexCache {
34 entries: LruMap<GraphIndexCacheKey, Arc<GraphIndex>>,
35}
36
37impl RuntimeCache {
38 pub async fn invalidate_all(&self) {
39 let mut cache = self.graph_indices.lock().await;
40 cache.entries.invalidate_all();
41 }
42
43 pub async fn graph_index(
44 &self,
45 resolved: &ResolvedTarget,
46 catalog: &Catalog,
47 ) -> Result<Arc<GraphIndex>> {
48 let key = graph_index_cache_key(resolved, catalog);
49 {
50 let mut cache = self.graph_indices.lock().await;
51 if let Some(index) = cache.entries.get(&key).cloned() {
52 return Ok(index);
53 }
54 }
55
56 let edge_types = catalog
57 .edge_types
58 .iter()
59 .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone())))
60 .collect();
61
62 let index = Arc::new(GraphIndex::build(&resolved.snapshot, &edge_types).await?);
63 let mut cache = self.graph_indices.lock().await;
64 if let Some(existing) = cache.entries.get(&key).cloned() {
65 return Ok(existing);
66 }
67 cache.insert(key, Arc::clone(&index));
68 Ok(index)
69 }
70}
71
72impl GraphIndexCache {
73 fn insert(&mut self, key: GraphIndexCacheKey, value: Arc<GraphIndex>) {
74 self.entries.insert(key, value);
75 }
76
77 #[cfg(test)]
78 fn touch(&mut self, key: GraphIndexCacheKey) {
79 self.entries.touch(key);
80 }
81}
82
83#[derive(Debug)]
84struct LruMap<K, V>
85where
86 K: Clone + Eq + Hash,
87{
88 entries: HashMap<K, V>,
89 lru: VecDeque<K>,
90 cap: usize,
91}
92
93impl<K, V> LruMap<K, V>
94where
95 K: Clone + Eq + Hash,
96{
97 fn new(cap: usize) -> Self {
98 Self {
99 entries: HashMap::new(),
100 lru: VecDeque::new(),
101 cap,
102 }
103 }
104
105 fn get(&mut self, key: &K) -> Option<&V> {
106 if self.entries.contains_key(key) {
107 self.touch(key.clone());
108 self.entries.get(key)
109 } else {
110 None
111 }
112 }
113
114 fn insert(&mut self, key: K, value: V) {
115 self.entries.insert(key.clone(), value);
116 self.touch(key);
117 while self.entries.len() > self.cap {
118 let Some(oldest) = self.lru.pop_front() else {
119 break;
120 };
121 self.entries.remove(&oldest);
122 }
123 }
124
125 fn invalidate_all(&mut self) {
126 self.entries.clear();
127 self.lru.clear();
128 }
129
130 #[cfg(test)]
131 fn contains_key(&self, key: &K) -> bool {
132 self.entries.contains_key(key)
133 }
134
135 #[cfg(test)]
136 fn len(&self) -> usize {
137 self.entries.len()
138 }
139
140 fn touch(&mut self, key: K) {
141 self.lru.retain(|existing| existing != &key);
142 self.lru.push_back(key);
143 }
144}
145
146impl Default for GraphIndexCache {
147 fn default() -> Self {
148 Self {
149 entries: LruMap::new(8),
150 }
151 }
152}
153
154fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphIndexCacheKey {
155 let mut edge_tables: Vec<GraphIndexTableState> = catalog
156 .edge_types
157 .keys()
158 .filter_map(|edge_name| {
159 let table_key = format!("edge:{}", edge_name);
160 resolved
161 .snapshot
162 .entry(&table_key)
163 .map(|entry| GraphIndexTableState {
164 table_key,
165 table_version: entry.table_version,
166 table_branch: entry.table_branch.clone(),
167 })
168 })
169 .collect();
170 edge_tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
171
172 GraphIndexCacheKey {
173 snapshot_id: resolved.snapshot_id.as_str().to_string(),
174 edge_tables,
175 }
176}
177
178const TABLE_HANDLE_CACHE_CAP: usize = 64;
184
185#[derive(Debug, Clone, PartialEq, Eq, Hash)]
186struct TableHandleKey {
187 table_path: String,
188 table_branch: Option<String>,
189 version: u64,
190 e_tag: Option<String>,
191}
192
193#[derive(Default)]
203pub struct TableHandleCache {
204 inner: Mutex<TableHandleCacheInner>,
205}
206
207struct TableHandleCacheInner {
208 entries: LruMap<TableHandleKey, Dataset>,
209}
210
211impl TableHandleCache {
212 pub async fn invalidate_all(&self) {
216 let mut inner = self.inner.lock().await;
217 inner.entries.invalidate_all();
218 }
219
220 pub async fn get_or_open(
224 &self,
225 table_path: &str,
226 table_branch: Option<&str>,
227 version: u64,
228 e_tag: Option<&str>,
229 location: &str,
230 session: Option<&Arc<Session>>,
231 ) -> Result<Dataset> {
232 let key = TableHandleKey {
233 table_path: table_path.to_string(),
234 table_branch: table_branch.map(str::to_string),
235 version,
236 e_tag: e_tag.map(str::to_string),
237 };
238 {
239 let mut inner = self.inner.lock().await;
240 if let Some(ds) = inner.entries.get(&key).cloned() {
241 return Ok(ds);
242 }
243 }
244 let ds = crate::instrumentation::open_table_dataset(location, version, session).await?;
248 let mut inner = self.inner.lock().await;
249 if let Some(existing) = inner.entries.get(&key).cloned() {
250 return Ok(existing);
251 }
252 inner.insert(key, ds.clone());
253 Ok(ds)
254 }
255}
256
257impl TableHandleCacheInner {
258 fn insert(&mut self, key: TableHandleKey, value: Dataset) {
259 self.entries.insert(key, value);
260 }
261}
262
263impl Default for TableHandleCacheInner {
264 fn default() -> Self {
265 Self {
266 entries: LruMap::new(TABLE_HANDLE_CACHE_CAP),
267 }
268 }
269}
270
271pub struct ReadCaches {
276 pub session: Arc<Session>,
277 pub handles: Arc<TableHandleCache>,
278}
279
280impl std::fmt::Debug for ReadCaches {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 f.debug_struct("ReadCaches").finish_non_exhaustive()
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use std::sync::Arc;
289
290 use super::*;
291
292 fn key(id: usize) -> GraphIndexCacheKey {
293 GraphIndexCacheKey {
294 snapshot_id: format!("snap-{id}"),
295 edge_tables: Vec::new(),
296 }
297 }
298
299 fn empty_index() -> Arc<GraphIndex> {
300 Arc::new(GraphIndex::empty_for_test())
301 }
302
303 #[test]
304 fn graph_index_cache_evicts_oldest_entry() {
305 let mut cache = GraphIndexCache::default();
306 for idx in 0..9 {
307 cache.insert(key(idx), empty_index());
308 }
309
310 assert_eq!(cache.entries.len(), 8);
311 assert!(!cache.entries.contains_key(&key(0)));
312 assert!(cache.entries.contains_key(&key(8)));
313 }
314
315 #[test]
316 fn graph_index_cache_touch_keeps_recent_entry() {
317 let mut cache = GraphIndexCache::default();
318 for idx in 0..8 {
319 cache.insert(key(idx), empty_index());
320 }
321
322 cache.touch(key(0));
323 cache.insert(key(8), empty_index());
324
325 assert!(cache.entries.contains_key(&key(0)));
326 assert!(!cache.entries.contains_key(&key(1)));
327 }
328
329 #[test]
330 fn lru_map_evicts_oldest_and_touch_refreshes_order() {
331 let mut map = LruMap::new(2);
332 map.insert("a", 1);
333 map.insert("b", 2);
334
335 assert_eq!(map.get(&"a"), Some(&1));
336 map.insert("c", 3);
337
338 assert!(map.contains_key(&"a"));
339 assert!(!map.contains_key(&"b"));
340 assert!(map.contains_key(&"c"));
341
342 map.invalidate_all();
343 assert_eq!(map.len(), 0);
344 }
345}