Skip to main content

omnigraph/
runtime_cache.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use omnigraph_compiler::catalog::Catalog;
5use tokio::sync::Mutex;
6
7use crate::db::ResolvedTarget;
8use crate::error::Result;
9use crate::graph_index::GraphIndex;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12struct GraphIndexCacheKey {
13    snapshot_id: String,
14    edge_tables: Vec<GraphIndexTableState>,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18struct GraphIndexTableState {
19    table_key: String,
20    table_version: u64,
21    table_branch: Option<String>,
22}
23
24#[derive(Debug, Default)]
25pub struct RuntimeCache {
26    graph_indices: Mutex<GraphIndexCache>,
27}
28
29#[derive(Debug, Default)]
30struct GraphIndexCache {
31    entries: HashMap<GraphIndexCacheKey, Arc<GraphIndex>>,
32    lru: VecDeque<GraphIndexCacheKey>,
33}
34
35impl RuntimeCache {
36    pub async fn invalidate_all(&self) {
37        let mut cache = self.graph_indices.lock().await;
38        cache.entries.clear();
39        cache.lru.clear();
40    }
41
42    pub async fn graph_index(
43        &self,
44        resolved: &ResolvedTarget,
45        catalog: &Catalog,
46    ) -> Result<Arc<GraphIndex>> {
47        let key = graph_index_cache_key(resolved, catalog);
48        {
49            let mut cache = self.graph_indices.lock().await;
50            if let Some(index) = cache.entries.get(&key).cloned() {
51                cache.touch(key.clone());
52                return Ok(index);
53            }
54        }
55
56        let edge_types = catalog
57            .edge_types
58            .iter()
59            .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone())))
60            .collect();
61
62        let index = Arc::new(GraphIndex::build(&resolved.snapshot, &edge_types).await?);
63        let mut cache = self.graph_indices.lock().await;
64        if let Some(existing) = cache.entries.get(&key).cloned() {
65            cache.touch(key);
66            return Ok(existing);
67        }
68        cache.insert(key, Arc::clone(&index));
69        Ok(index)
70    }
71}
72
73impl GraphIndexCache {
74    fn insert(&mut self, key: GraphIndexCacheKey, value: Arc<GraphIndex>) {
75        self.entries.insert(key.clone(), value);
76        self.touch(key);
77        while self.entries.len() > 8 {
78            let Some(oldest) = self.lru.pop_front() else {
79                break;
80            };
81            if self.entries.remove(&oldest).is_some() {
82                break;
83            }
84        }
85    }
86
87    fn touch(&mut self, key: GraphIndexCacheKey) {
88        self.lru.retain(|existing| existing != &key);
89        self.lru.push_back(key);
90    }
91}
92
93fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphIndexCacheKey {
94    let mut edge_tables: Vec<GraphIndexTableState> = catalog
95        .edge_types
96        .keys()
97        .filter_map(|edge_name| {
98            let table_key = format!("edge:{}", edge_name);
99            resolved
100                .snapshot
101                .entry(&table_key)
102                .map(|entry| GraphIndexTableState {
103                    table_key,
104                    table_version: entry.table_version,
105                    table_branch: entry.table_branch.clone(),
106                })
107        })
108        .collect();
109    edge_tables.sort_by(|a, b| a.table_key.cmp(&b.table_key));
110
111    GraphIndexCacheKey {
112        snapshot_id: resolved.snapshot_id.as_str().to_string(),
113        edge_tables,
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use std::sync::Arc;
120
121    use super::*;
122
123    fn key(id: usize) -> GraphIndexCacheKey {
124        GraphIndexCacheKey {
125            snapshot_id: format!("snap-{id}"),
126            edge_tables: Vec::new(),
127        }
128    }
129
130    fn empty_index() -> Arc<GraphIndex> {
131        Arc::new(GraphIndex::empty_for_test())
132    }
133
134    #[test]
135    fn graph_index_cache_evicts_oldest_entry() {
136        let mut cache = GraphIndexCache::default();
137        for idx in 0..9 {
138            cache.insert(key(idx), empty_index());
139        }
140
141        assert_eq!(cache.entries.len(), 8);
142        assert!(!cache.entries.contains_key(&key(0)));
143        assert!(cache.entries.contains_key(&key(8)));
144    }
145
146    #[test]
147    fn graph_index_cache_touch_keeps_recent_entry() {
148        let mut cache = GraphIndexCache::default();
149        for idx in 0..8 {
150            cache.insert(key(idx), empty_index());
151        }
152
153        cache.touch(key(0));
154        cache.insert(key(8), empty_index());
155
156        assert!(cache.entries.contains_key(&key(0)));
157        assert!(!cache.entries.contains_key(&key(1)));
158    }
159}