mcpr_core/protocol/schema_manager/
store.rs1use std::future::Future;
12use std::sync::Arc;
13
14use dashmap::DashMap;
15
16use super::version::{SchemaVersion, SchemaVersionId};
17
18pub trait SchemaStore: Send + Sync + 'static {
23 fn put_version(&self, version: SchemaVersion) -> impl Future<Output = SchemaVersion> + Send;
27
28 fn get_version(
30 &self,
31 id: &SchemaVersionId,
32 ) -> impl Future<Output = Option<SchemaVersion>> + Send;
33
34 fn latest_version_for_method(
36 &self,
37 upstream_id: &str,
38 method: &str,
39 ) -> impl Future<Output = Option<SchemaVersion>> + Send;
40
41 fn list_versions(
44 &self,
45 upstream_id: &str,
46 method: &str,
47 ) -> impl Future<Output = Vec<SchemaVersion>> + Send;
48
49 fn prune(
53 &self,
54 _upstream_id: &str,
55 _method: &str,
56 _keep: usize,
57 ) -> impl Future<Output = ()> + Send {
58 async {}
59 }
60}
61
62type VersionKey = (String, String); #[derive(Clone)]
70pub struct MemorySchemaStore {
71 by_key: Arc<DashMap<VersionKey, Vec<SchemaVersion>>>,
72 index: Arc<DashMap<SchemaVersionId, VersionKey>>,
73 capacity: usize,
74}
75
76impl MemorySchemaStore {
77 pub fn new() -> Self {
78 Self::with_capacity(20)
79 }
80
81 pub fn with_capacity(capacity: usize) -> Self {
82 assert!(capacity > 0, "MemorySchemaStore capacity must be > 0");
83 Self {
84 by_key: Arc::new(DashMap::new()),
85 index: Arc::new(DashMap::new()),
86 capacity,
87 }
88 }
89}
90
91impl Default for MemorySchemaStore {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97impl SchemaStore for MemorySchemaStore {
98 async fn put_version(&self, version: SchemaVersion) -> SchemaVersion {
99 let key: VersionKey = (version.upstream_id.clone(), version.method.clone());
100 let id = version.id.clone();
101
102 let mut entry = self.by_key.entry(key.clone()).or_default();
103 entry.push(version.clone());
104 while entry.len() > self.capacity {
105 let evicted = entry.remove(0);
106 self.index.remove(&evicted.id);
107 }
108 drop(entry);
109
110 self.index.insert(id, key);
111 version
112 }
113
114 async fn get_version(&self, id: &SchemaVersionId) -> Option<SchemaVersion> {
115 let key = self.index.get(id)?.clone();
116 let ring = self.by_key.get(&key)?;
117 ring.iter().find(|v| &v.id == id).cloned()
118 }
119
120 async fn latest_version_for_method(
121 &self,
122 upstream_id: &str,
123 method: &str,
124 ) -> Option<SchemaVersion> {
125 let key = (upstream_id.to_string(), method.to_string());
126 let ring = self.by_key.get(&key)?;
127 ring.last().cloned()
128 }
129
130 async fn list_versions(&self, upstream_id: &str, method: &str) -> Vec<SchemaVersion> {
131 let key = (upstream_id.to_string(), method.to_string());
132 match self.by_key.get(&key) {
133 Some(ring) => ring.iter().rev().cloned().collect(),
134 None => Vec::new(),
135 }
136 }
137
138 async fn prune(&self, upstream_id: &str, method: &str, keep: usize) {
139 let key = (upstream_id.to_string(), method.to_string());
140 let Some(mut entry) = self.by_key.get_mut(&key) else {
141 return;
142 };
143 while entry.len() > keep {
144 let evicted = entry.remove(0);
145 self.index.remove(&evicted.id);
146 }
147 }
148}
149
150#[cfg(test)]
151#[allow(non_snake_case)]
152mod tests {
153 use super::*;
154 use chrono::Utc;
155 use serde_json::json;
156
157 fn version(upstream: &str, method: &str, version: u32, payload_tag: &str) -> SchemaVersion {
158 let hash = format!("{method}-{version}-{payload_tag}");
159 SchemaVersion {
160 id: SchemaVersionId(hash[..hash.len().min(16)].to_string()),
161 upstream_id: upstream.to_string(),
162 method: method.to_string(),
163 version,
164 payload: Arc::new(json!({"tag": payload_tag})),
165 content_hash: hash,
166 captured_at: Utc::now(),
167 }
168 }
169
170 #[tokio::test]
171 async fn put_and_get_version__roundtrip() {
172 let store = MemorySchemaStore::new();
173 let v = version("p1", "tools/list", 1, "a");
174 let stored = store.put_version(v.clone()).await;
175 assert_eq!(stored.id, v.id);
176
177 let fetched = store.get_version(&v.id).await.unwrap();
178 assert_eq!(fetched.id, v.id);
179 assert_eq!(fetched.version, 1);
180 }
181
182 #[tokio::test]
183 async fn latest_version_for_method__returns_newest() {
184 let store = MemorySchemaStore::new();
185 store.put_version(version("p1", "tools/list", 1, "a")).await;
186 store.put_version(version("p1", "tools/list", 2, "b")).await;
187 store.put_version(version("p1", "tools/list", 3, "c")).await;
188
189 let latest = store
190 .latest_version_for_method("p1", "tools/list")
191 .await
192 .unwrap();
193 assert_eq!(latest.version, 3);
194 }
195
196 #[tokio::test]
197 async fn latest_version_for_method__none_when_empty() {
198 let store = MemorySchemaStore::new();
199 assert!(
200 store
201 .latest_version_for_method("p1", "tools/list")
202 .await
203 .is_none()
204 );
205 }
206
207 #[tokio::test]
208 async fn list_versions__newest_first() {
209 let store = MemorySchemaStore::new();
210 for i in 1..=3 {
211 store
212 .put_version(version("p1", "tools/list", i, &i.to_string()))
213 .await;
214 }
215 let all = store.list_versions("p1", "tools/list").await;
216 let nums: Vec<u32> = all.iter().map(|v| v.version).collect();
217 assert_eq!(nums, vec![3, 2, 1]);
218 }
219
220 #[tokio::test]
221 async fn ring_buffer__evicts_oldest_at_capacity() {
222 let store = MemorySchemaStore::with_capacity(3);
223 for i in 1..=5 {
224 store
225 .put_version(version("p1", "tools/list", i, &i.to_string()))
226 .await;
227 }
228 let all = store.list_versions("p1", "tools/list").await;
229 let nums: Vec<u32> = all.iter().map(|v| v.version).collect();
230 assert_eq!(nums, vec![5, 4, 3]);
231 }
232
233 #[tokio::test]
234 async fn ring_buffer__get_version_returns_none_after_eviction() {
235 let store = MemorySchemaStore::with_capacity(2);
236 let v1 = version("p1", "tools/list", 1, "a");
237 store.put_version(v1.clone()).await;
238 store.put_version(version("p1", "tools/list", 2, "b")).await;
239 store.put_version(version("p1", "tools/list", 3, "c")).await;
240
241 assert!(store.get_version(&v1.id).await.is_none());
242 }
243
244 #[tokio::test]
245 async fn prune__trims_to_keep_count() {
246 let store = MemorySchemaStore::new();
247 for i in 1..=5 {
248 store
249 .put_version(version("p1", "tools/list", i, &i.to_string()))
250 .await;
251 }
252 store.prune("p1", "tools/list", 2).await;
253 let all = store.list_versions("p1", "tools/list").await;
254 let nums: Vec<u32> = all.iter().map(|v| v.version).collect();
255 assert_eq!(nums, vec![5, 4]);
256 }
257
258 #[tokio::test]
259 async fn prune__noop_when_keep_exceeds_size() {
260 let store = MemorySchemaStore::new();
261 for i in 1..=3 {
262 store
263 .put_version(version("p1", "tools/list", i, &i.to_string()))
264 .await;
265 }
266 store.prune("p1", "tools/list", 10).await;
267 let all = store.list_versions("p1", "tools/list").await;
268 assert_eq!(all.len(), 3);
269 }
270
271 #[tokio::test]
272 async fn different_methods__isolated() {
273 let store = MemorySchemaStore::new();
274 store.put_version(version("p1", "tools/list", 1, "t")).await;
275 store
276 .put_version(version("p1", "prompts/list", 1, "p"))
277 .await;
278
279 assert_eq!(store.list_versions("p1", "tools/list").await.len(), 1);
280 assert_eq!(store.list_versions("p1", "prompts/list").await.len(), 1);
281 }
282
283 #[tokio::test]
284 async fn different_upstreams__isolated() {
285 let store = MemorySchemaStore::new();
286 store.put_version(version("p1", "tools/list", 1, "a")).await;
287 store.put_version(version("p2", "tools/list", 1, "b")).await;
288
289 let p1_latest = store
290 .latest_version_for_method("p1", "tools/list")
291 .await
292 .unwrap();
293 let p2_latest = store
294 .latest_version_for_method("p2", "tools/list")
295 .await
296 .unwrap();
297
298 assert_eq!(p1_latest.upstream_id, "p1");
299 assert_eq!(p2_latest.upstream_id, "p2");
300 }
301
302 #[test]
303 #[should_panic(expected = "capacity must be > 0")]
304 fn with_capacity__rejects_zero() {
305 let _ = MemorySchemaStore::with_capacity(0);
306 }
307}