use std::collections::HashMap;
use tokio::sync::RwLock;
use crate::capsule::CapsuleId;
#[derive(Debug, Clone)]
pub struct TopicSchema {
pub capsule_id: CapsuleId,
pub description: Option<String>,
pub schema: Option<serde_json::Value>,
}
#[derive(Debug, Default)]
pub struct SchemaCatalog {
schemas: RwLock<HashMap<String, TopicSchema>>,
}
impl SchemaCatalog {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn register_topics(
&self,
capsule_id: &CapsuleId,
topics: &[crate::manifest::TopicDef],
baked_schemas: &HashMap<String, serde_json::Value>,
) {
let mut schemas = self.schemas.write().await;
for topic in topics {
schemas.insert(
topic.name.clone(),
TopicSchema {
capsule_id: capsule_id.clone(),
description: topic.description.clone(),
schema: baked_schemas.get(&topic.name).cloned(),
},
);
}
}
pub async fn unregister_capsule(&self, capsule_id: &CapsuleId) {
let mut schemas = self.schemas.write().await;
schemas.retain(|_, v| &v.capsule_id != capsule_id);
}
pub async fn get(&self, topic: &str) -> Option<TopicSchema> {
self.schemas.read().await.get(topic).cloned()
}
pub async fn all(&self) -> HashMap<String, TopicSchema> {
self.schemas.read().await.clone()
}
pub async fn len(&self) -> usize {
self.schemas.read().await.len()
}
pub async fn is_empty(&self) -> bool {
self.schemas.read().await.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::manifest::{TopicDef, TopicDirection};
fn test_capsule_id() -> CapsuleId {
CapsuleId::from_static("test-capsule")
}
#[tokio::test]
async fn register_and_lookup() {
let catalog = SchemaCatalog::new();
let topics = vec![TopicDef {
name: "registry.v1.active_model_changed".into(),
direction: TopicDirection::Publish,
description: Some("Published when the active model changes".into()),
schema: None,
wit_type: None,
}];
catalog
.register_topics(&test_capsule_id(), &topics, &HashMap::new())
.await;
let schema = catalog.get("registry.v1.active_model_changed").await;
assert!(schema.is_some());
let schema = schema.unwrap();
assert_eq!(schema.capsule_id, test_capsule_id());
assert!(schema.description.is_some());
assert!(schema.schema.is_none());
}
#[tokio::test]
async fn register_with_baked_schema() {
let catalog = SchemaCatalog::new();
let topics = vec![TopicDef {
name: "registry.v1.active_model_changed".into(),
direction: TopicDirection::Publish,
description: Some("Published when the active model changes".into()),
schema: None,
wit_type: Some("provider-entry".into()),
}];
let mut baked = HashMap::new();
baked.insert(
"registry.v1.active_model_changed".into(),
serde_json::json!({
"type": "object",
"properties": {
"id": {"type": "string", "description": "Model ID"}
}
}),
);
catalog
.register_topics(&test_capsule_id(), &topics, &baked)
.await;
let schema = catalog.get("registry.v1.active_model_changed").await;
assert!(schema.is_some());
let schema = schema.unwrap();
assert!(schema.schema.is_some());
let json_schema = schema.schema.unwrap();
assert_eq!(json_schema["properties"]["id"]["type"], "string");
}
#[tokio::test]
async fn unregister_capsule_removes_its_topics() {
let catalog = SchemaCatalog::new();
let id = test_capsule_id();
let topics = vec![
TopicDef {
name: "a.v1.foo".into(),
direction: TopicDirection::Publish,
description: None,
schema: None,
wit_type: None,
},
TopicDef {
name: "a.v1.bar".into(),
direction: TopicDirection::Subscribe,
description: None,
schema: None,
wit_type: None,
},
];
catalog.register_topics(&id, &topics, &HashMap::new()).await;
assert_eq!(catalog.len().await, 2);
catalog.unregister_capsule(&id).await;
assert!(catalog.is_empty().await);
}
#[tokio::test]
async fn multiple_capsules_independent() {
let catalog = SchemaCatalog::new();
let id_a = CapsuleId::from_static("capsule-a");
let id_b = CapsuleId::from_static("capsule-b");
catalog
.register_topics(
&id_a,
&[TopicDef {
name: "a.v1.event".into(),
direction: TopicDirection::Publish,
description: None,
schema: None,
wit_type: None,
}],
&HashMap::new(),
)
.await;
catalog
.register_topics(
&id_b,
&[TopicDef {
name: "b.v1.event".into(),
direction: TopicDirection::Publish,
description: None,
schema: None,
wit_type: None,
}],
&HashMap::new(),
)
.await;
assert_eq!(catalog.len().await, 2);
catalog.unregister_capsule(&id_a).await;
assert_eq!(catalog.len().await, 1);
assert!(catalog.get("b.v1.event").await.is_some());
assert!(catalog.get("a.v1.event").await.is_none());
}
}