use std::collections::HashMap;
use tokio::sync::RwLock;
use crate::capsule::CapsuleId;
use crate::manifest::CapsuleManifest;
#[derive(Debug, Clone)]
pub struct TopicSchema {
pub capsule_id: CapsuleId,
pub wit_ref: String,
pub description: Option<String>,
pub schema: Option<serde_json::Value>,
}
#[derive(Debug, Default)]
pub struct SchemaCatalog {
schemas: RwLock<HashMap<String, TopicSchema>>,
}
impl SchemaCatalog {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn register_topics(&self, capsule_id: &CapsuleId, manifest: &CapsuleManifest) {
let mut schemas = self.schemas.write().await;
let entries = manifest
.publishes
.iter()
.map(|(topic, def)| (topic, &def.wit))
.chain(
manifest
.subscribes
.iter()
.map(|(topic, def)| (topic, &def.wit)),
);
for (topic, wit_ref) in entries {
if let Some(prev) = schemas.get(topic) {
if &prev.wit_ref != wit_ref {
tracing::warn!(
capsule = %capsule_id,
topic = %topic,
publish_wit = %prev.wit_ref,
subscribe_wit = %wit_ref,
"topic declared in both [publish] and [subscribe] with \
conflicting wit refs; keeping the [subscribe] ref"
);
}
}
schemas.insert(
topic.clone(),
TopicSchema {
capsule_id: capsule_id.clone(),
wit_ref: wit_ref.clone(),
description: None,
schema: None,
},
);
}
}
pub async fn unregister_capsule(&self, capsule_id: &CapsuleId) {
let mut schemas = self.schemas.write().await;
schemas.retain(|_, v| &v.capsule_id != capsule_id);
}
pub async fn get(&self, topic: &str) -> Option<TopicSchema> {
self.schemas.read().await.get(topic).cloned()
}
pub async fn all(&self) -> HashMap<String, TopicSchema> {
self.schemas.read().await.clone()
}
pub async fn len(&self) -> usize {
self.schemas.read().await.len()
}
pub async fn is_empty(&self) -> bool {
self.schemas.read().await.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::manifest::{PublishDef, SubscribeDef};
fn test_capsule_id() -> CapsuleId {
CapsuleId::from_static("test-capsule")
}
fn publish(wit: &str) -> PublishDef {
PublishDef {
wit: wit.into(),
version: None,
tag: None,
rev: None,
branch: None,
path: None,
fanout: false,
}
}
fn subscribe(wit: &str) -> SubscribeDef {
SubscribeDef {
wit: wit.into(),
version: None,
tag: None,
rev: None,
branch: None,
path: None,
handler: None,
priority: None,
}
}
fn manifest_with(publishes: &[(&str, &str)], subscribes: &[(&str, &str)]) -> CapsuleManifest {
CapsuleManifest {
publishes: publishes
.iter()
.map(|(t, w)| ((*t).to_string(), publish(w)))
.collect(),
subscribes: subscribes
.iter()
.map(|(t, w)| ((*t).to_string(), subscribe(w)))
.collect(),
..Default::default()
}
}
#[tokio::test]
async fn register_records_wit_ref_from_publish_table() {
let catalog = SchemaCatalog::new();
let manifest = manifest_with(
&[(
"registry.v1.active_model_changed",
"@unicity-astrid/wit/registry/active-model",
)],
&[],
);
catalog.register_topics(&test_capsule_id(), &manifest).await;
let schema = catalog
.get("registry.v1.active_model_changed")
.await
.expect("topic registered");
assert_eq!(schema.capsule_id, test_capsule_id());
assert_eq!(schema.wit_ref, "@unicity-astrid/wit/registry/active-model");
assert!(schema.description.is_none());
assert!(schema.schema.is_none());
}
#[tokio::test]
async fn register_covers_publish_and_subscribe() {
let catalog = SchemaCatalog::new();
let manifest = manifest_with(
&[("a.v1.foo", "@scope/wit/a/foo")],
&[("a.v1.bar", "opaque")],
);
catalog.register_topics(&test_capsule_id(), &manifest).await;
assert_eq!(catalog.len().await, 2);
assert_eq!(catalog.get("a.v1.bar").await.unwrap().wit_ref, "opaque");
}
#[tokio::test]
async fn unregister_capsule_removes_its_topics() {
let catalog = SchemaCatalog::new();
let id = test_capsule_id();
let manifest = manifest_with(
&[("a.v1.foo", "@scope/wit/a/foo")],
&[("a.v1.bar", "@scope/wit/a/bar")],
);
catalog.register_topics(&id, &manifest).await;
assert_eq!(catalog.len().await, 2);
catalog.unregister_capsule(&id).await;
assert!(catalog.is_empty().await);
}
#[tokio::test]
async fn multiple_capsules_independent() {
let catalog = SchemaCatalog::new();
let id_a = CapsuleId::from_static("capsule-a");
let id_b = CapsuleId::from_static("capsule-b");
catalog
.register_topics(
&id_a,
&manifest_with(&[("a.v1.event", "@scope/wit/a/e")], &[]),
)
.await;
catalog
.register_topics(
&id_b,
&manifest_with(&[("b.v1.event", "@scope/wit/b/e")], &[]),
)
.await;
assert_eq!(catalog.len().await, 2);
catalog.unregister_capsule(&id_a).await;
assert_eq!(catalog.len().await, 1);
assert!(catalog.get("b.v1.event").await.is_some());
assert!(catalog.get("a.v1.event").await.is_none());
}
}