Skip to main content

veles_core/
cache.rs

1//! Process-wide lockfree-ish cache of loaded `VelesIndex`es.
2//!
3//! Shared between the MCP and gRPC servers (and anything else that
4//! wants to serve searches from in-memory indexes without re-walking
5//! `<repo>/.veles` per request).
6//!
7//! Design notes:
8//!
9//! - **Storage**: `DashMap<String, CacheEntry>` — sharded internal
10//!   locks, so concurrent operations on different repos never contend.
11//!   The "lockfree" label is the practical kind: contention is bounded
12//!   to a single shard, not the whole map.
13//!
14//! - **Per-index synchronization**: each cache entry holds an
15//!   `Arc<RwLock<VelesIndex>>`. Read-only operations (search, defs,
16//!   refs, ...) take a shared read lock; `update_from_path` takes the
17//!   exclusive write lock. Two clients searching the same repo proceed
18//!   in parallel; an `update` briefly blocks readers.
19//!
20//! - **Build deduplication**: a `OnceCell` lives inside every entry,
21//!   so several concurrent loaders of the same repo cooperate — one
22//!   thread does the (slow) walk + embed + load, the others await its
23//!   result. No wasted duplicate builds.
24//!
25//! - **LRU eviction**: each entry stores an `AtomicU64 last_access`.
26//!   Every hit / miss bumps a global counter and writes it into the
27//!   entry. When the cache exceeds capacity we scan and evict the
28//!   smallest. O(N) but N is small (≤ 16 in practice).
29//!
30//! Tests assume the eviction is "eventually correct", not strictly LRU
31//! under contention — two threads concurrently bumping `last_access`
32//! on different entries may finish in arbitrary order. For the actual
33//! workload (10-slot cache, ~tens of repos per session) this is fine.
34
35use std::path::Path;
36use std::sync::Arc;
37use std::sync::atomic::{AtomicU64, Ordering};
38
39use anyhow::{Result, bail};
40use dashmap::DashMap;
41use model2vec_rs::model::StaticModel;
42use tokio::sync::{OnceCell, RwLock};
43
44use crate::VelesIndex;
45use crate::persist;
46
47/// How many `VelesIndex` entries the cache keeps before evicting LRU.
48pub const DEFAULT_CACHE_SIZE: usize = 10;
49
50/// One cached index plus the metadata we need for LRU + build dedup.
51struct CacheEntry {
52    /// Initialised lazily: the first `get_or_load` for this repo
53    /// triggers the build; concurrent callers await the same future
54    /// rather than launching their own. `Arc` on the outside so we
55    /// can hand it back to callers cheaply after init.
56    cell: Arc<OnceCell<Arc<RwLock<VelesIndex>>>>,
57    /// Monotonic counter snapshot of the last hit / miss touching this
58    /// entry. Newer = larger. Updated lockfree via relaxed store.
59    last_access: AtomicU64,
60}
61
62/// Lockfree-ish process cache of loaded indexes.
63pub struct IndexCache {
64    entries: DashMap<String, CacheEntry>,
65    model: StaticModel,
66    capacity: usize,
67    /// Global monotonic clock for LRU ordering.
68    counter: AtomicU64,
69}
70
71impl IndexCache {
72    /// Build a cache with the default capacity (`DEFAULT_CACHE_SIZE`).
73    pub fn new(model: StaticModel) -> Self {
74        Self::with_capacity(model, DEFAULT_CACHE_SIZE)
75    }
76
77    /// Build a cache with an explicit capacity.
78    pub fn with_capacity(model: StaticModel, capacity: usize) -> Self {
79        Self {
80            entries: DashMap::with_capacity(capacity.max(1)),
81            model,
82            capacity: capacity.max(1),
83            counter: AtomicU64::new(0),
84        }
85    }
86
87    /// Get or lazily build the `VelesIndex` for `repo`.
88    ///
89    /// Returns an `Arc<RwLock<VelesIndex>>` the caller can `.read()` or
90    /// `.write()` independently of the cache lock. Multiple concurrent
91    /// loaders of the same repo share a single in-flight build via the
92    /// internal `OnceCell`.
93    ///
94    /// `repo` is either a local directory path or an `https://` git URL.
95    /// Local paths prefer the persisted `.veles/` index when one exists
96    /// (fast load) and fall back to a fresh in-memory build otherwise.
97    pub async fn get_or_load(
98        &self,
99        repo: &str,
100        include_text_files: bool,
101    ) -> Result<Arc<RwLock<VelesIndex>>> {
102        // Take or create the cell, update LRU timestamp. The shard lock
103        // is held only for this `entry()` call — building runs outside.
104        let cell = {
105            let entry = self
106                .entries
107                .entry(repo.to_string())
108                .or_insert_with(|| CacheEntry {
109                    cell: Arc::new(OnceCell::new()),
110                    last_access: AtomicU64::new(0),
111                });
112            entry.last_access.store(self.tick(), Ordering::Relaxed);
113            entry.cell.clone()
114        };
115
116        // Initialise the cell. `get_or_try_init` ensures exactly one
117        // caller runs the closure; others await its result. On error
118        // the cell stays empty so the next call retries.
119        let index = cell
120            .get_or_try_init(|| async {
121                let built = self.build_index(repo, include_text_files)?;
122                anyhow::Ok(Arc::new(RwLock::new(built)))
123            })
124            .await
125            .map_err(|e| anyhow::anyhow!("failed to load {repo}: {e}"))?;
126
127        // Opportunistic LRU eviction. Done after insert (not before) so
128        // we never evict a fresh entry that's about to be returned.
129        if self.entries.len() > self.capacity {
130            self.evict_lru();
131        }
132
133        Ok(index.clone())
134    }
135
136    /// Look up `repo` without building. Returns `Some` only if the
137    /// cell has been initialised (i.e. a previous `get_or_load` for
138    /// this repo has completed successfully). Used by callers that
139    /// want to gate on "is this repo bootstrapped yet?" without
140    /// triggering an expensive build — e.g. the gRPC `GetStats` RPC.
141    pub fn peek(&self, repo: &str) -> Option<Arc<RwLock<VelesIndex>>> {
142        let entry = self.entries.get(repo)?;
143        entry.last_access.store(self.tick(), Ordering::Relaxed);
144        entry.cell.get().cloned()
145    }
146
147    /// Drop the cached entry for `repo` if present. Useful for tests
148    /// and explicit invalidation.
149    pub fn invalidate(&self, repo: &str) -> bool {
150        self.entries.remove(repo).is_some()
151    }
152
153    /// Current number of cached repos.
154    pub fn len(&self) -> usize {
155        self.entries.len()
156    }
157
158    /// True if no repos are cached.
159    pub fn is_empty(&self) -> bool {
160        self.entries.is_empty()
161    }
162
163    /// Configured capacity.
164    pub fn capacity(&self) -> usize {
165        self.capacity
166    }
167
168    fn tick(&self) -> u64 {
169        self.counter.fetch_add(1, Ordering::Relaxed)
170    }
171
172    /// Walk every entry to find the smallest `last_access` and drop it.
173    /// O(N) on the cache size — fine because N is bounded to `capacity`
174    /// (default 10).
175    fn evict_lru(&self) {
176        let oldest = self
177            .entries
178            .iter()
179            .min_by_key(|e| e.value().last_access.load(Ordering::Relaxed))
180            .map(|e| e.key().clone());
181        if let Some(key) = oldest {
182            self.entries.remove(&key);
183        }
184    }
185
186    /// Build a `VelesIndex` for `repo`. Synchronous, CPU-bound — runs
187    /// inside the `OnceCell::get_or_try_init` future. A future refactor
188    /// (`spawn_blocking`) can offload this from the tokio worker; for
189    /// now it matches the legacy MCP / gRPC behaviour.
190    fn build_index(&self, repo: &str, include_text_files: bool) -> Result<VelesIndex> {
191        let model = self.model.clone();
192        let path = Path::new(repo);
193
194        if path.is_dir() {
195            // Persisted index wins over a fresh build: keeps subsequent
196            // `stats` / `status` / `update` consistent with the on-disk
197            // chunk count, and avoids re-embedding on every cold start.
198            // Fall back to a fresh build if load fails (incompatible
199            // format, missing sidecar files, ...).
200            if persist::index_exists(path) {
201                match VelesIndex::load(path, model.clone()) {
202                    Ok(idx) => return Ok(idx),
203                    Err(_) => {
204                        // load failed — fall through to fresh build
205                    }
206                }
207            }
208            VelesIndex::from_path(path, Some(model), None, include_text_files)
209        } else if repo.starts_with("https://") || repo.starts_with("http://") {
210            VelesIndex::from_git(repo, None, Some(model), include_text_files)
211        } else {
212            bail!("Invalid repo: must be a local directory or https:// URL")
213        }
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    fn test_model() -> StaticModel {
222        crate::model::load_model(None).expect("test model load")
223    }
224
225    #[tokio::test]
226    async fn caches_same_repo_across_calls() {
227        let cache = IndexCache::new(test_model());
228        let dir = tempfile::tempdir().unwrap();
229        std::fs::write(dir.path().join("a.rs"), "fn hello() {}\n").unwrap();
230        let repo = dir.path().to_string_lossy().into_owned();
231
232        let a = cache.get_or_load(&repo, false).await.unwrap();
233        let b = cache.get_or_load(&repo, false).await.unwrap();
234        assert!(Arc::ptr_eq(&a, &b), "cache miss on repeat lookup");
235        assert_eq!(cache.len(), 1);
236    }
237
238    #[tokio::test]
239    async fn evicts_lru_when_over_capacity() {
240        let cache = IndexCache::with_capacity(test_model(), 2);
241        let dirs: Vec<_> = (0..3)
242            .map(|i| {
243                let d = tempfile::tempdir().unwrap();
244                std::fs::write(d.path().join("a.rs"), format!("fn fn_{i}() {{}}\n")).unwrap();
245                d
246            })
247            .collect();
248        let paths: Vec<String> = dirs
249            .iter()
250            .map(|d| d.path().to_string_lossy().into_owned())
251            .collect();
252
253        // Load three repos into a 2-slot cache.
254        let _ = cache.get_or_load(&paths[0], false).await.unwrap();
255        let _ = cache.get_or_load(&paths[1], false).await.unwrap();
256        // Re-touch [0] so it's newer than [1].
257        let _ = cache.get_or_load(&paths[0], false).await.unwrap();
258        // Inserting [2] should evict the LRU — which is now [1].
259        let _ = cache.get_or_load(&paths[2], false).await.unwrap();
260
261        assert_eq!(cache.len(), 2);
262        // [1] was evicted; [0] and [2] remain.
263        assert!(cache.entries.contains_key(&paths[0]));
264        assert!(cache.entries.contains_key(&paths[2]));
265        assert!(!cache.entries.contains_key(&paths[1]));
266    }
267
268    #[tokio::test]
269    async fn invalidate_removes_entry() {
270        let cache = IndexCache::new(test_model());
271        let dir = tempfile::tempdir().unwrap();
272        std::fs::write(dir.path().join("a.rs"), "fn x() {}\n").unwrap();
273        let repo = dir.path().to_string_lossy().into_owned();
274        let _ = cache.get_or_load(&repo, false).await.unwrap();
275        assert!(cache.invalidate(&repo));
276        assert!(cache.is_empty());
277        // Idempotent — second invalidate is a no-op.
278        assert!(!cache.invalidate(&repo));
279    }
280}