Skip to main content

omnigraph/
runtime_cache.rs

1use std::collections::{HashMap, VecDeque};
2use std::hash::Hash;
3use std::sync::Arc;
4
5use lance::Dataset;
6use lance::session::Session;
7use omnigraph_compiler::catalog::Catalog;
8use tokio::sync::Mutex;
9
10use crate::db::ResolvedTarget;
11use crate::error::Result;
12use crate::graph_index::GraphIndex;
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash)]
15struct GraphIndexCacheKey {
16    snapshot_id: String,
17    edge_tables: Vec<GraphIndexTableState>,
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21struct GraphIndexTableState {
22    table_key: String,
23    table_version: u64,
24    table_branch: Option<String>,
25}
26
27#[derive(Debug, Default)]
28pub struct RuntimeCache {
29    graph_indices: Mutex<GraphIndexCache>,
30}
31
32#[derive(Debug)]
33struct GraphIndexCache {
34    entries: LruMap<GraphIndexCacheKey, Arc<GraphIndex>>,
35}
36
37impl RuntimeCache {
38    pub async fn invalidate_all(&self) {
39        let mut cache = self.graph_indices.lock().await;
40        cache.entries.invalidate_all();
41    }
42
43    pub async fn graph_index(
44        &self,
45        resolved: &ResolvedTarget,
46        catalog: &Catalog,
47    ) -> Result<Arc<GraphIndex>> {
48        let key = graph_index_cache_key(resolved, catalog);
49        {
50            let mut cache = self.graph_indices.lock().await;
51            if let Some(index) = cache.entries.get(&key).cloned() {
52                return Ok(index);
53            }
54        }
55
56        let edge_types = catalog
57            .edge_types
58            .iter()
59            .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone())))
60            .collect();
61
62        let index = Arc::new(GraphIndex::build(&resolved.snapshot, &edge_types).await?);
63        let mut cache = self.graph_indices.lock().await;
64        if let Some(existing) = cache.entries.get(&key).cloned() {
65            return Ok(existing);
66        }
67        cache.insert(key, Arc::clone(&index));
68        Ok(index)
69    }
70}
71
72impl GraphIndexCache {
73    fn insert(&mut self, key: GraphIndexCacheKey, value: Arc<GraphIndex>) {
74        self.entries.insert(key, value);
75    }
76
77    #[cfg(test)]
78    fn touch(&mut self, key: GraphIndexCacheKey) {
79        self.entries.touch(key);
80    }
81}
82
83#[derive(Debug)]
84struct LruMap<K, V>
85where
86    K: Clone + Eq + Hash,
87{
88    entries: HashMap<K, V>,
89    lru: VecDeque<K>,
90    cap: usize,
91}
92
93impl<K, V> LruMap<K, V>
94where
95    K: Clone + Eq + Hash,
96{
97    fn new(cap: usize) -> Self {
98        Self {
99            entries: HashMap::new(),
100            lru: VecDeque::new(),
101            cap,
102        }
103    }
104
105    fn get(&mut self, key: &K) -> Option<&V> {
106        if self.entries.contains_key(key) {
107            self.touch(key.clone());
108            self.entries.get(key)
109        } else {
110            None
111        }
112    }
113
114    fn insert(&mut self, key: K, value: V) {
115        self.entries.insert(key.clone(), value);
116        self.touch(key);
117        while self.entries.len() > self.cap {
118            let Some(oldest) = self.lru.pop_front() else {
119                break;
120            };
121            self.entries.remove(&oldest);
122        }
123    }
124
125    fn invalidate_all(&mut self) {
126        self.entries.clear();
127        self.lru.clear();
128    }
129
130    #[cfg(test)]
131    fn contains_key(&self, key: &K) -> bool {
132        self.entries.contains_key(key)
133    }
134
135    #[cfg(test)]
136    fn len(&self) -> usize {
137        self.entries.len()
138    }
139
140    fn touch(&mut self, key: K) {
141        self.lru.retain(|existing| existing != &key);
142        self.lru.push_back(key);
143    }
144}
145
146impl Default for GraphIndexCache {
147    fn default() -> Self {
148        Self {
149            entries: LruMap::new(8),
150        }
151    }
152}
153
154fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphIndexCacheKey {
155    let mut edge_tables: Vec<GraphIndexTableState> = catalog
156        .edge_types
157        .keys()
158        .filter_map(|edge_name| {
159            let table_key = format!("edge:{}", edge_name);
160            resolved
161                .snapshot
162                .entry(&table_key)
163                .map(|entry| GraphIndexTableState {
164                    table_key,
165                    table_version: entry.table_version,
166                    table_branch: entry.table_branch.clone(),
167                })
168        })
169        .collect();
170    edge_tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
171
172    GraphIndexCacheKey {
173        snapshot_id: resolved.snapshot_id.as_str().to_string(),
174        edge_tables,
175    }
176}
177
178/// Max held `Dataset` handles. A handle holds only Arcs (object store + manifest),
179/// never table data, so this is cheap; it bounds how many `(table, branch,
180/// version, e_tag)` cells stay warm. One graph's live table set across a couple
181/// of branches at the current version fits comfortably, with headroom for the
182/// recently-superseded versions left by writes until they age out.
183const TABLE_HANDLE_CACHE_CAP: usize = 64;
184
185#[derive(Debug, Clone, PartialEq, Eq, Hash)]
186struct TableHandleKey {
187    table_path: String,
188    table_branch: Option<String>,
189    version: u64,
190    e_tag: Option<String>,
191}
192
193/// Held open-`Dataset` handles keyed by `(table_path, branch, version, e_tag)` — the
194/// version-keyed analogue of LanceDB's `DatasetConsistencyWrapper`
195/// (`rust/lancedb/src/table/dataset.rs`). A warm read reuses a held handle with
196/// zero open IO (a cheap `Dataset` clone); a miss opens once at the location with
197/// the shared `Session`. Version plus e_tag are in the key, so a write (or a
198/// delete/recreate that reuses a version number on object stores with e_tags) is
199/// simply a new key. A same-branch manifest refresh clears this cache as the
200/// fallback for e_tag-less table locations. Only read-path Data opens use this —
201/// writes open HEAD directly and never receive a pinned handle.
202#[derive(Default)]
203pub struct TableHandleCache {
204    inner: Mutex<TableHandleCacheInner>,
205}
206
207struct TableHandleCacheInner {
208    entries: LruMap<TableHandleKey, Dataset>,
209}
210
211impl TableHandleCache {
212    /// Drop all held handles. Correctness never requires this (version-in-key);
213    /// it is memory hygiene, called from the same hooks that clear the graph
214    /// index cache (branch switch / refresh).
215    pub async fn invalidate_all(&self) {
216        let mut inner = self.inner.lock().await;
217        inner.entries.invalidate_all();
218    }
219
220    /// Return the dataset for `(table_path, branch, version, e_tag)`, reusing a
221    /// held handle (0 open IO) or opening it once at `location` with the shared
222    /// `session` on a miss.
223    pub async fn get_or_open(
224        &self,
225        table_path: &str,
226        table_branch: Option<&str>,
227        version: u64,
228        e_tag: Option<&str>,
229        location: &str,
230        session: Option<&Arc<Session>>,
231    ) -> Result<Dataset> {
232        let key = TableHandleKey {
233            table_path: table_path.to_string(),
234            table_branch: table_branch.map(str::to_string),
235            version,
236            e_tag: e_tag.map(str::to_string),
237        };
238        {
239            let mut inner = self.inner.lock().await;
240            if let Some(ds) = inner.entries.get(&key).cloned() {
241                return Ok(ds);
242            }
243        }
244        // Miss: open without holding the lock (the open is async IO). A concurrent
245        // double-miss opens twice and one wins the insert — correct (the dataset
246        // at a version is immutable) and rare.
247        let ds = crate::instrumentation::open_table_dataset(location, version, session).await?;
248        let mut inner = self.inner.lock().await;
249        if let Some(existing) = inner.entries.get(&key).cloned() {
250            return Ok(existing);
251        }
252        inner.insert(key, ds.clone());
253        Ok(ds)
254    }
255}
256
257impl TableHandleCacheInner {
258    fn insert(&mut self, key: TableHandleKey, value: Dataset) {
259        self.entries.insert(key, value);
260    }
261}
262
263impl Default for TableHandleCacheInner {
264    fn default() -> Self {
265        Self {
266            entries: LruMap::new(TABLE_HANDLE_CACHE_CAP),
267        }
268    }
269}
270
271/// Per-graph read caches handed to a resolved `Snapshot` so its table opens reuse
272/// one shared `Session` (LanceDB's one-session-per-connection pattern) and the
273/// held-handle cache. Manual `Debug` because `lance::session::Session` is not
274/// `Debug`; this lets `Snapshot` keep its `#[derive(Debug)]`.
275pub struct ReadCaches {
276    pub session: Arc<Session>,
277    pub handles: Arc<TableHandleCache>,
278}
279
280impl std::fmt::Debug for ReadCaches {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        f.debug_struct("ReadCaches").finish_non_exhaustive()
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use std::sync::Arc;
289
290    use super::*;
291
292    fn key(id: usize) -> GraphIndexCacheKey {
293        GraphIndexCacheKey {
294            snapshot_id: format!("snap-{id}"),
295            edge_tables: Vec::new(),
296        }
297    }
298
299    fn empty_index() -> Arc<GraphIndex> {
300        Arc::new(GraphIndex::empty_for_test())
301    }
302
303    #[test]
304    fn graph_index_cache_evicts_oldest_entry() {
305        let mut cache = GraphIndexCache::default();
306        for idx in 0..9 {
307            cache.insert(key(idx), empty_index());
308        }
309
310        assert_eq!(cache.entries.len(), 8);
311        assert!(!cache.entries.contains_key(&key(0)));
312        assert!(cache.entries.contains_key(&key(8)));
313    }
314
315    #[test]
316    fn graph_index_cache_touch_keeps_recent_entry() {
317        let mut cache = GraphIndexCache::default();
318        for idx in 0..8 {
319            cache.insert(key(idx), empty_index());
320        }
321
322        cache.touch(key(0));
323        cache.insert(key(8), empty_index());
324
325        assert!(cache.entries.contains_key(&key(0)));
326        assert!(!cache.entries.contains_key(&key(1)));
327    }
328
329    #[test]
330    fn lru_map_evicts_oldest_and_touch_refreshes_order() {
331        let mut map = LruMap::new(2);
332        map.insert("a", 1);
333        map.insert("b", 2);
334
335        assert_eq!(map.get(&"a"), Some(&1));
336        map.insert("c", 3);
337
338        assert!(map.contains_key(&"a"));
339        assert!(!map.contains_key(&"b"));
340        assert!(map.contains_key(&"c"));
341
342        map.invalidate_all();
343        assert_eq!(map.len(), 0);
344    }
345}