use std::future::Future;
use std::sync::Arc;
use dashmap::DashMap;
use super::version::{SchemaVersion, SchemaVersionId};
pub trait SchemaStore: Send + Sync + 'static {
fn put_version(&self, version: SchemaVersion) -> impl Future<Output = SchemaVersion> + Send;
fn get_version(
&self,
id: &SchemaVersionId,
) -> impl Future<Output = Option<SchemaVersion>> + Send;
fn latest_version_for_method(
&self,
upstream_id: &str,
method: &str,
) -> impl Future<Output = Option<SchemaVersion>> + Send;
fn list_versions(
&self,
upstream_id: &str,
method: &str,
) -> impl Future<Output = Vec<SchemaVersion>> + Send;
fn prune(
&self,
_upstream_id: &str,
_method: &str,
_keep: usize,
) -> impl Future<Output = ()> + Send {
async {}
}
}
type VersionKey = (String, String);
#[derive(Clone)]
pub struct MemorySchemaStore {
by_key: Arc<DashMap<VersionKey, Vec<SchemaVersion>>>,
index: Arc<DashMap<SchemaVersionId, VersionKey>>,
capacity: usize,
}
impl MemorySchemaStore {
pub fn new() -> Self {
Self::with_capacity(20)
}
pub fn with_capacity(capacity: usize) -> Self {
assert!(capacity > 0, "MemorySchemaStore capacity must be > 0");
Self {
by_key: Arc::new(DashMap::new()),
index: Arc::new(DashMap::new()),
capacity,
}
}
}
impl Default for MemorySchemaStore {
fn default() -> Self {
Self::new()
}
}
impl SchemaStore for MemorySchemaStore {
async fn put_version(&self, version: SchemaVersion) -> SchemaVersion {
let key: VersionKey = (version.upstream_id.clone(), version.method.clone());
let id = version.id.clone();
let mut entry = self.by_key.entry(key.clone()).or_default();
entry.push(version.clone());
while entry.len() > self.capacity {
let evicted = entry.remove(0);
self.index.remove(&evicted.id);
}
drop(entry);
self.index.insert(id, key);
version
}
async fn get_version(&self, id: &SchemaVersionId) -> Option<SchemaVersion> {
let key = self.index.get(id)?.clone();
let ring = self.by_key.get(&key)?;
ring.iter().find(|v| &v.id == id).cloned()
}
async fn latest_version_for_method(
&self,
upstream_id: &str,
method: &str,
) -> Option<SchemaVersion> {
let key = (upstream_id.to_string(), method.to_string());
let ring = self.by_key.get(&key)?;
ring.last().cloned()
}
async fn list_versions(&self, upstream_id: &str, method: &str) -> Vec<SchemaVersion> {
let key = (upstream_id.to_string(), method.to_string());
match self.by_key.get(&key) {
Some(ring) => ring.iter().rev().cloned().collect(),
None => Vec::new(),
}
}
async fn prune(&self, upstream_id: &str, method: &str, keep: usize) {
let key = (upstream_id.to_string(), method.to_string());
let Some(mut entry) = self.by_key.get_mut(&key) else {
return;
};
while entry.len() > keep {
let evicted = entry.remove(0);
self.index.remove(&evicted.id);
}
}
}
#[cfg(test)]
#[allow(non_snake_case)]
mod tests {
use super::*;
use chrono::Utc;
use serde_json::json;
fn version(upstream: &str, method: &str, version: u32, payload_tag: &str) -> SchemaVersion {
let hash = format!("{method}-{version}-{payload_tag}");
SchemaVersion {
id: SchemaVersionId(hash[..hash.len().min(16)].to_string()),
upstream_id: upstream.to_string(),
method: method.to_string(),
version,
payload: Arc::new(json!({"tag": payload_tag})),
content_hash: hash,
captured_at: Utc::now(),
}
}
#[tokio::test]
async fn put_and_get_version__roundtrip() {
let store = MemorySchemaStore::new();
let v = version("p1", "tools/list", 1, "a");
let stored = store.put_version(v.clone()).await;
assert_eq!(stored.id, v.id);
let fetched = store.get_version(&v.id).await.unwrap();
assert_eq!(fetched.id, v.id);
assert_eq!(fetched.version, 1);
}
#[tokio::test]
async fn latest_version_for_method__returns_newest() {
let store = MemorySchemaStore::new();
store.put_version(version("p1", "tools/list", 1, "a")).await;
store.put_version(version("p1", "tools/list", 2, "b")).await;
store.put_version(version("p1", "tools/list", 3, "c")).await;
let latest = store
.latest_version_for_method("p1", "tools/list")
.await
.unwrap();
assert_eq!(latest.version, 3);
}
#[tokio::test]
async fn latest_version_for_method__none_when_empty() {
let store = MemorySchemaStore::new();
assert!(
store
.latest_version_for_method("p1", "tools/list")
.await
.is_none()
);
}
#[tokio::test]
async fn list_versions__newest_first() {
let store = MemorySchemaStore::new();
for i in 1..=3 {
store
.put_version(version("p1", "tools/list", i, &i.to_string()))
.await;
}
let all = store.list_versions("p1", "tools/list").await;
let nums: Vec<u32> = all.iter().map(|v| v.version).collect();
assert_eq!(nums, vec![3, 2, 1]);
}
#[tokio::test]
async fn ring_buffer__evicts_oldest_at_capacity() {
let store = MemorySchemaStore::with_capacity(3);
for i in 1..=5 {
store
.put_version(version("p1", "tools/list", i, &i.to_string()))
.await;
}
let all = store.list_versions("p1", "tools/list").await;
let nums: Vec<u32> = all.iter().map(|v| v.version).collect();
assert_eq!(nums, vec![5, 4, 3]);
}
#[tokio::test]
async fn ring_buffer__get_version_returns_none_after_eviction() {
let store = MemorySchemaStore::with_capacity(2);
let v1 = version("p1", "tools/list", 1, "a");
store.put_version(v1.clone()).await;
store.put_version(version("p1", "tools/list", 2, "b")).await;
store.put_version(version("p1", "tools/list", 3, "c")).await;
assert!(store.get_version(&v1.id).await.is_none());
}
#[tokio::test]
async fn prune__trims_to_keep_count() {
let store = MemorySchemaStore::new();
for i in 1..=5 {
store
.put_version(version("p1", "tools/list", i, &i.to_string()))
.await;
}
store.prune("p1", "tools/list", 2).await;
let all = store.list_versions("p1", "tools/list").await;
let nums: Vec<u32> = all.iter().map(|v| v.version).collect();
assert_eq!(nums, vec![5, 4]);
}
#[tokio::test]
async fn prune__noop_when_keep_exceeds_size() {
let store = MemorySchemaStore::new();
for i in 1..=3 {
store
.put_version(version("p1", "tools/list", i, &i.to_string()))
.await;
}
store.prune("p1", "tools/list", 10).await;
let all = store.list_versions("p1", "tools/list").await;
assert_eq!(all.len(), 3);
}
#[tokio::test]
async fn different_methods__isolated() {
let store = MemorySchemaStore::new();
store.put_version(version("p1", "tools/list", 1, "t")).await;
store
.put_version(version("p1", "prompts/list", 1, "p"))
.await;
assert_eq!(store.list_versions("p1", "tools/list").await.len(), 1);
assert_eq!(store.list_versions("p1", "prompts/list").await.len(), 1);
}
#[tokio::test]
async fn different_upstreams__isolated() {
let store = MemorySchemaStore::new();
store.put_version(version("p1", "tools/list", 1, "a")).await;
store.put_version(version("p2", "tools/list", 1, "b")).await;
let p1_latest = store
.latest_version_for_method("p1", "tools/list")
.await
.unwrap();
let p2_latest = store
.latest_version_for_method("p2", "tools/list")
.await
.unwrap();
assert_eq!(p1_latest.upstream_id, "p1");
assert_eq!(p2_latest.upstream_id, "p2");
}
#[test]
#[should_panic(expected = "capacity must be > 0")]
fn with_capacity__rejects_zero() {
let _ = MemorySchemaStore::with_capacity(0);
}
}