Skip to main content

mcpr_core/protocol/schema_manager/
store.rs

1//! `SchemaStore` — pluggable persistence for captured `SchemaVersion`s.
2//!
3//! Mirrors the `SessionStore` pattern: return-position `impl Future`
4//! signatures, generic at call sites. Not `dyn`-compatible by design.
5//
6// TODO(step-2): content-addressed item dedup pool. Today the in-memory
7// store holds whole payloads per version; items that appear across
8// versions are cloned. A later step can intern tool / resource / prompt
9// entries behind the same `SchemaStore` interface.
10
11use std::future::Future;
12use std::sync::Arc;
13
14use dashmap::DashMap;
15
16use super::version::{SchemaVersion, SchemaVersionId};
17
18/// Trait for schema version storage backends.
19///
20/// Implementations must be `Send + Sync + 'static` so a single store
21/// can back many concurrent `SchemaManager` readers.
22pub trait SchemaStore: Send + Sync + 'static {
23    /// Insert a new version. Implementations handle retention / eviction.
24    /// Returns the stored version (the caller may not have populated
25    /// `version` correctly — implementations may assign it).
26    fn put_version(&self, version: SchemaVersion) -> impl Future<Output = SchemaVersion> + Send;
27
28    /// Fetch a specific version by id. `None` if never stored or evicted.
29    fn get_version(
30        &self,
31        id: &SchemaVersionId,
32    ) -> impl Future<Output = Option<SchemaVersion>> + Send;
33
34    /// Latest version for a given `(upstream_id, method)`.
35    fn latest_version_for_method(
36        &self,
37        upstream_id: &str,
38        method: &str,
39    ) -> impl Future<Output = Option<SchemaVersion>> + Send;
40
41    /// All versions for a given `(upstream_id, method)`, newest first.
42    /// Bounded by the store's retention policy.
43    fn list_versions(
44        &self,
45        upstream_id: &str,
46        method: &str,
47    ) -> impl Future<Output = Vec<SchemaVersion>> + Send;
48
49    /// Drop versions older than `keep` for `(upstream_id, method)`.
50    /// Idempotent. Default impl is a no-op for stores without explicit
51    /// pruning.
52    fn prune(
53        &self,
54        _upstream_id: &str,
55        _method: &str,
56        _keep: usize,
57    ) -> impl Future<Output = ()> + Send {
58        async {}
59    }
60}
61
62type VersionKey = (String, String); // (upstream_id, method)
63
64/// In-memory `SchemaStore` backed by DashMap.
65///
66/// Each `(upstream_id, method)` key owns a FIFO ring buffer of at most
67/// `capacity` versions. On insert overflow, the oldest version is
68/// dropped and its id removed from the lookup index.
69#[derive(Clone)]
70pub struct MemorySchemaStore {
71    by_key: Arc<DashMap<VersionKey, Vec<SchemaVersion>>>,
72    index: Arc<DashMap<SchemaVersionId, VersionKey>>,
73    capacity: usize,
74}
75
76impl MemorySchemaStore {
77    pub fn new() -> Self {
78        Self::with_capacity(20)
79    }
80
81    pub fn with_capacity(capacity: usize) -> Self {
82        assert!(capacity > 0, "MemorySchemaStore capacity must be > 0");
83        Self {
84            by_key: Arc::new(DashMap::new()),
85            index: Arc::new(DashMap::new()),
86            capacity,
87        }
88    }
89}
90
91impl Default for MemorySchemaStore {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl SchemaStore for MemorySchemaStore {
98    async fn put_version(&self, version: SchemaVersion) -> SchemaVersion {
99        let key: VersionKey = (version.upstream_id.clone(), version.method.clone());
100        let id = version.id.clone();
101
102        let mut entry = self.by_key.entry(key.clone()).or_default();
103        entry.push(version.clone());
104        while entry.len() > self.capacity {
105            let evicted = entry.remove(0);
106            self.index.remove(&evicted.id);
107        }
108        drop(entry);
109
110        self.index.insert(id, key);
111        version
112    }
113
114    async fn get_version(&self, id: &SchemaVersionId) -> Option<SchemaVersion> {
115        let key = self.index.get(id)?.clone();
116        let ring = self.by_key.get(&key)?;
117        ring.iter().find(|v| &v.id == id).cloned()
118    }
119
120    async fn latest_version_for_method(
121        &self,
122        upstream_id: &str,
123        method: &str,
124    ) -> Option<SchemaVersion> {
125        let key = (upstream_id.to_string(), method.to_string());
126        let ring = self.by_key.get(&key)?;
127        ring.last().cloned()
128    }
129
130    async fn list_versions(&self, upstream_id: &str, method: &str) -> Vec<SchemaVersion> {
131        let key = (upstream_id.to_string(), method.to_string());
132        match self.by_key.get(&key) {
133            Some(ring) => ring.iter().rev().cloned().collect(),
134            None => Vec::new(),
135        }
136    }
137
138    async fn prune(&self, upstream_id: &str, method: &str, keep: usize) {
139        let key = (upstream_id.to_string(), method.to_string());
140        let Some(mut entry) = self.by_key.get_mut(&key) else {
141            return;
142        };
143        while entry.len() > keep {
144            let evicted = entry.remove(0);
145            self.index.remove(&evicted.id);
146        }
147    }
148}
149
150#[cfg(test)]
151#[allow(non_snake_case)]
152mod tests {
153    use super::*;
154    use chrono::Utc;
155    use serde_json::json;
156
157    fn version(upstream: &str, method: &str, version: u32, payload_tag: &str) -> SchemaVersion {
158        let hash = format!("{method}-{version}-{payload_tag}");
159        SchemaVersion {
160            id: SchemaVersionId(hash[..hash.len().min(16)].to_string()),
161            upstream_id: upstream.to_string(),
162            method: method.to_string(),
163            version,
164            payload: Arc::new(json!({"tag": payload_tag})),
165            content_hash: hash,
166            captured_at: Utc::now(),
167        }
168    }
169
170    #[tokio::test]
171    async fn put_and_get_version__roundtrip() {
172        let store = MemorySchemaStore::new();
173        let v = version("p1", "tools/list", 1, "a");
174        let stored = store.put_version(v.clone()).await;
175        assert_eq!(stored.id, v.id);
176
177        let fetched = store.get_version(&v.id).await.unwrap();
178        assert_eq!(fetched.id, v.id);
179        assert_eq!(fetched.version, 1);
180    }
181
182    #[tokio::test]
183    async fn latest_version_for_method__returns_newest() {
184        let store = MemorySchemaStore::new();
185        store.put_version(version("p1", "tools/list", 1, "a")).await;
186        store.put_version(version("p1", "tools/list", 2, "b")).await;
187        store.put_version(version("p1", "tools/list", 3, "c")).await;
188
189        let latest = store
190            .latest_version_for_method("p1", "tools/list")
191            .await
192            .unwrap();
193        assert_eq!(latest.version, 3);
194    }
195
196    #[tokio::test]
197    async fn latest_version_for_method__none_when_empty() {
198        let store = MemorySchemaStore::new();
199        assert!(
200            store
201                .latest_version_for_method("p1", "tools/list")
202                .await
203                .is_none()
204        );
205    }
206
207    #[tokio::test]
208    async fn list_versions__newest_first() {
209        let store = MemorySchemaStore::new();
210        for i in 1..=3 {
211            store
212                .put_version(version("p1", "tools/list", i, &i.to_string()))
213                .await;
214        }
215        let all = store.list_versions("p1", "tools/list").await;
216        let nums: Vec<u32> = all.iter().map(|v| v.version).collect();
217        assert_eq!(nums, vec![3, 2, 1]);
218    }
219
220    #[tokio::test]
221    async fn ring_buffer__evicts_oldest_at_capacity() {
222        let store = MemorySchemaStore::with_capacity(3);
223        for i in 1..=5 {
224            store
225                .put_version(version("p1", "tools/list", i, &i.to_string()))
226                .await;
227        }
228        let all = store.list_versions("p1", "tools/list").await;
229        let nums: Vec<u32> = all.iter().map(|v| v.version).collect();
230        assert_eq!(nums, vec![5, 4, 3]);
231    }
232
233    #[tokio::test]
234    async fn ring_buffer__get_version_returns_none_after_eviction() {
235        let store = MemorySchemaStore::with_capacity(2);
236        let v1 = version("p1", "tools/list", 1, "a");
237        store.put_version(v1.clone()).await;
238        store.put_version(version("p1", "tools/list", 2, "b")).await;
239        store.put_version(version("p1", "tools/list", 3, "c")).await;
240
241        assert!(store.get_version(&v1.id).await.is_none());
242    }
243
244    #[tokio::test]
245    async fn prune__trims_to_keep_count() {
246        let store = MemorySchemaStore::new();
247        for i in 1..=5 {
248            store
249                .put_version(version("p1", "tools/list", i, &i.to_string()))
250                .await;
251        }
252        store.prune("p1", "tools/list", 2).await;
253        let all = store.list_versions("p1", "tools/list").await;
254        let nums: Vec<u32> = all.iter().map(|v| v.version).collect();
255        assert_eq!(nums, vec![5, 4]);
256    }
257
258    #[tokio::test]
259    async fn prune__noop_when_keep_exceeds_size() {
260        let store = MemorySchemaStore::new();
261        for i in 1..=3 {
262            store
263                .put_version(version("p1", "tools/list", i, &i.to_string()))
264                .await;
265        }
266        store.prune("p1", "tools/list", 10).await;
267        let all = store.list_versions("p1", "tools/list").await;
268        assert_eq!(all.len(), 3);
269    }
270
271    #[tokio::test]
272    async fn different_methods__isolated() {
273        let store = MemorySchemaStore::new();
274        store.put_version(version("p1", "tools/list", 1, "t")).await;
275        store
276            .put_version(version("p1", "prompts/list", 1, "p"))
277            .await;
278
279        assert_eq!(store.list_versions("p1", "tools/list").await.len(), 1);
280        assert_eq!(store.list_versions("p1", "prompts/list").await.len(), 1);
281    }
282
283    #[tokio::test]
284    async fn different_upstreams__isolated() {
285        let store = MemorySchemaStore::new();
286        store.put_version(version("p1", "tools/list", 1, "a")).await;
287        store.put_version(version("p2", "tools/list", 1, "b")).await;
288
289        let p1_latest = store
290            .latest_version_for_method("p1", "tools/list")
291            .await
292            .unwrap();
293        let p2_latest = store
294            .latest_version_for_method("p2", "tools/list")
295            .await
296            .unwrap();
297
298        assert_eq!(p1_latest.upstream_id, "p1");
299        assert_eq!(p2_latest.upstream_id, "p2");
300    }
301
302    #[test]
303    #[should_panic(expected = "capacity must be > 0")]
304    fn with_capacity__rejects_zero() {
305        let _ = MemorySchemaStore::with_capacity(0);
306    }
307}