veles_core/cache.rs
1//! Process-wide lockfree-ish cache of loaded `VelesIndex`es.
2//!
3//! Shared between the MCP and gRPC servers (and anything else that
4//! wants to serve searches from in-memory indexes without re-walking
5//! `<repo>/.veles` per request).
6//!
7//! Design notes:
8//!
9//! - **Storage**: `DashMap<String, CacheEntry>` — sharded internal
10//! locks, so concurrent operations on different repos never contend.
11//! The "lockfree" label is the practical kind: contention is bounded
12//! to a single shard, not the whole map.
13//!
14//! - **Per-index synchronization**: each cache entry holds an
15//! `Arc<RwLock<VelesIndex>>`. Read-only operations (search, defs,
16//! refs, ...) take a shared read lock; `update_from_path` takes the
17//! exclusive write lock. Two clients searching the same repo proceed
18//! in parallel; an `update` briefly blocks readers.
19//!
20//! - **Build deduplication**: a `OnceCell` lives inside every entry,
21//! so several concurrent loaders of the same repo cooperate — one
22//! thread does the (slow) walk + embed + load, the others await its
23//! result. No wasted duplicate builds.
24//!
25//! - **LRU eviction**: each entry stores an `AtomicU64 last_access`.
26//! Every hit / miss bumps a global counter and writes it into the
27//! entry. When the cache exceeds capacity we scan and evict the
28//! smallest. O(N) but N is small (≤ 16 in practice).
29//!
30//! Tests assume the eviction is "eventually correct", not strictly LRU
31//! under contention — two threads concurrently bumping `last_access`
32//! on different entries may finish in arbitrary order. For the actual
33//! workload (10-slot cache, ~tens of repos per session) this is fine.
34
35use std::path::Path;
36use std::sync::Arc;
37use std::sync::atomic::{AtomicU64, Ordering};
38
39use anyhow::{Result, bail};
40use dashmap::DashMap;
41use model2vec_rs::model::StaticModel;
42use tokio::sync::{OnceCell, RwLock};
43
44use crate::VelesIndex;
45use crate::persist;
46
47/// How many `VelesIndex` entries the cache keeps before evicting LRU.
48pub const DEFAULT_CACHE_SIZE: usize = 10;
49
50/// One cached index plus the metadata we need for LRU + build dedup.
51struct CacheEntry {
52 /// Initialised lazily: the first `get_or_load` for this repo
53 /// triggers the build; concurrent callers await the same future
54 /// rather than launching their own. `Arc` on the outside so we
55 /// can hand it back to callers cheaply after init.
56 cell: Arc<OnceCell<Arc<RwLock<VelesIndex>>>>,
57 /// Monotonic counter snapshot of the last hit / miss touching this
58 /// entry. Newer = larger. Updated lockfree via relaxed store.
59 last_access: AtomicU64,
60}
61
62/// Lockfree-ish process cache of loaded indexes.
63pub struct IndexCache {
64 entries: DashMap<String, CacheEntry>,
65 model: StaticModel,
66 capacity: usize,
67 /// Global monotonic clock for LRU ordering.
68 counter: AtomicU64,
69}
70
71impl IndexCache {
72 /// Build a cache with the default capacity (`DEFAULT_CACHE_SIZE`).
73 pub fn new(model: StaticModel) -> Self {
74 Self::with_capacity(model, DEFAULT_CACHE_SIZE)
75 }
76
77 /// Build a cache with an explicit capacity.
78 pub fn with_capacity(model: StaticModel, capacity: usize) -> Self {
79 Self {
80 entries: DashMap::with_capacity(capacity.max(1)),
81 model,
82 capacity: capacity.max(1),
83 counter: AtomicU64::new(0),
84 }
85 }
86
87 /// Get or lazily build the `VelesIndex` for `repo`.
88 ///
89 /// Returns an `Arc<RwLock<VelesIndex>>` the caller can `.read()` or
90 /// `.write()` independently of the cache lock. Multiple concurrent
91 /// loaders of the same repo share a single in-flight build via the
92 /// internal `OnceCell`.
93 ///
94 /// `repo` is either a local directory path or an `https://` git URL.
95 /// Local paths prefer the persisted `.veles/` index when one exists
96 /// (fast load) and fall back to a fresh in-memory build otherwise.
97 pub async fn get_or_load(
98 &self,
99 repo: &str,
100 include_text_files: bool,
101 ) -> Result<Arc<RwLock<VelesIndex>>> {
102 // Take or create the cell, update LRU timestamp. The shard lock
103 // is held only for this `entry()` call — building runs outside.
104 let cell = {
105 let entry = self
106 .entries
107 .entry(repo.to_string())
108 .or_insert_with(|| CacheEntry {
109 cell: Arc::new(OnceCell::new()),
110 last_access: AtomicU64::new(0),
111 });
112 entry.last_access.store(self.tick(), Ordering::Relaxed);
113 entry.cell.clone()
114 };
115
116 // Initialise the cell. `get_or_try_init` ensures exactly one
117 // caller runs the closure; others await its result. On error
118 // the cell stays empty so the next call retries.
119 let index = cell
120 .get_or_try_init(|| async {
121 let built = self.build_index(repo, include_text_files)?;
122 anyhow::Ok(Arc::new(RwLock::new(built)))
123 })
124 .await
125 .map_err(|e| anyhow::anyhow!("failed to load {repo}: {e}"))?;
126
127 // Opportunistic LRU eviction. Done after insert (not before) so
128 // we never evict a fresh entry that's about to be returned.
129 if self.entries.len() > self.capacity {
130 self.evict_lru();
131 }
132
133 Ok(index.clone())
134 }
135
136 /// Look up `repo` without building. Returns `Some` only if the
137 /// cell has been initialised (i.e. a previous `get_or_load` for
138 /// this repo has completed successfully). Used by callers that
139 /// want to gate on "is this repo bootstrapped yet?" without
140 /// triggering an expensive build — e.g. the gRPC `GetStats` RPC.
141 pub fn peek(&self, repo: &str) -> Option<Arc<RwLock<VelesIndex>>> {
142 let entry = self.entries.get(repo)?;
143 entry.last_access.store(self.tick(), Ordering::Relaxed);
144 entry.cell.get().cloned()
145 }
146
147 /// Drop the cached entry for `repo` if present. Useful for tests
148 /// and explicit invalidation.
149 pub fn invalidate(&self, repo: &str) -> bool {
150 self.entries.remove(repo).is_some()
151 }
152
153 /// Current number of cached repos.
154 pub fn len(&self) -> usize {
155 self.entries.len()
156 }
157
158 /// True if no repos are cached.
159 pub fn is_empty(&self) -> bool {
160 self.entries.is_empty()
161 }
162
163 /// Configured capacity.
164 pub fn capacity(&self) -> usize {
165 self.capacity
166 }
167
168 fn tick(&self) -> u64 {
169 self.counter.fetch_add(1, Ordering::Relaxed)
170 }
171
172 /// Walk every entry to find the smallest `last_access` and drop it.
173 /// O(N) on the cache size — fine because N is bounded to `capacity`
174 /// (default 10).
175 fn evict_lru(&self) {
176 let oldest = self
177 .entries
178 .iter()
179 .min_by_key(|e| e.value().last_access.load(Ordering::Relaxed))
180 .map(|e| e.key().clone());
181 if let Some(key) = oldest {
182 self.entries.remove(&key);
183 }
184 }
185
186 /// Build a `VelesIndex` for `repo`. Synchronous, CPU-bound — runs
187 /// inside the `OnceCell::get_or_try_init` future. A future refactor
188 /// (`spawn_blocking`) can offload this from the tokio worker; for
189 /// now it matches the legacy MCP / gRPC behaviour.
190 fn build_index(&self, repo: &str, include_text_files: bool) -> Result<VelesIndex> {
191 let model = self.model.clone();
192 let path = Path::new(repo);
193
194 if path.is_dir() {
195 // Persisted index wins over a fresh build: keeps subsequent
196 // `stats` / `status` / `update` consistent with the on-disk
197 // chunk count, and avoids re-embedding on every cold start.
198 // Fall back to a fresh build if load fails (incompatible
199 // format, missing sidecar files, ...).
200 if persist::index_exists(path) {
201 match VelesIndex::load(path, model.clone()) {
202 Ok(idx) => return Ok(idx),
203 Err(_) => {
204 // load failed — fall through to fresh build
205 }
206 }
207 }
208 VelesIndex::from_path(path, Some(model), None, include_text_files)
209 } else if repo.starts_with("https://") || repo.starts_with("http://") {
210 VelesIndex::from_git(repo, None, Some(model), include_text_files)
211 } else {
212 bail!("Invalid repo: must be a local directory or https:// URL")
213 }
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 fn test_model() -> StaticModel {
222 crate::model::load_model(None).expect("test model load")
223 }
224
225 #[tokio::test]
226 async fn caches_same_repo_across_calls() {
227 let cache = IndexCache::new(test_model());
228 let dir = tempfile::tempdir().unwrap();
229 std::fs::write(dir.path().join("a.rs"), "fn hello() {}\n").unwrap();
230 let repo = dir.path().to_string_lossy().into_owned();
231
232 let a = cache.get_or_load(&repo, false).await.unwrap();
233 let b = cache.get_or_load(&repo, false).await.unwrap();
234 assert!(Arc::ptr_eq(&a, &b), "cache miss on repeat lookup");
235 assert_eq!(cache.len(), 1);
236 }
237
238 #[tokio::test]
239 async fn evicts_lru_when_over_capacity() {
240 let cache = IndexCache::with_capacity(test_model(), 2);
241 let dirs: Vec<_> = (0..3)
242 .map(|i| {
243 let d = tempfile::tempdir().unwrap();
244 std::fs::write(d.path().join("a.rs"), format!("fn fn_{i}() {{}}\n")).unwrap();
245 d
246 })
247 .collect();
248 let paths: Vec<String> = dirs
249 .iter()
250 .map(|d| d.path().to_string_lossy().into_owned())
251 .collect();
252
253 // Load three repos into a 2-slot cache.
254 let _ = cache.get_or_load(&paths[0], false).await.unwrap();
255 let _ = cache.get_or_load(&paths[1], false).await.unwrap();
256 // Re-touch [0] so it's newer than [1].
257 let _ = cache.get_or_load(&paths[0], false).await.unwrap();
258 // Inserting [2] should evict the LRU — which is now [1].
259 let _ = cache.get_or_load(&paths[2], false).await.unwrap();
260
261 assert_eq!(cache.len(), 2);
262 // [1] was evicted; [0] and [2] remain.
263 assert!(cache.entries.contains_key(&paths[0]));
264 assert!(cache.entries.contains_key(&paths[2]));
265 assert!(!cache.entries.contains_key(&paths[1]));
266 }
267
268 #[tokio::test]
269 async fn invalidate_removes_entry() {
270 let cache = IndexCache::new(test_model());
271 let dir = tempfile::tempdir().unwrap();
272 std::fs::write(dir.path().join("a.rs"), "fn x() {}\n").unwrap();
273 let repo = dir.path().to_string_lossy().into_owned();
274 let _ = cache.get_or_load(&repo, false).await.unwrap();
275 assert!(cache.invalidate(&repo));
276 assert!(cache.is_empty());
277 // Idempotent — second invalidate is a no-op.
278 assert!(!cache.invalidate(&repo));
279 }
280}