rivven_schema/storage/
mod.rs1mod memory;
12
13#[cfg(feature = "broker")]
14mod broker;
15
16pub use memory::MemoryStorage;
17
18#[cfg(feature = "broker")]
19pub use broker::{BrokerStorage, BrokerStorageStats};
20
21use crate::error::SchemaResult;
22use crate::types::{Schema, SchemaId, SchemaVersion, Subject, SubjectVersion, VersionState};
23use async_trait::async_trait;
24use std::sync::Arc;
25
26#[async_trait]
28pub trait StorageBackend: Send + Sync {
29 async fn store_schema(&self, schema: Schema) -> SchemaResult<SchemaId>;
31
32 async fn get_schema(&self, id: SchemaId) -> SchemaResult<Option<Schema>>;
34
35 async fn get_schema_by_fingerprint(&self, fingerprint: &str) -> SchemaResult<Option<Schema>>;
37
38 async fn register_subject_version(
40 &self,
41 subject: &Subject,
42 schema_id: SchemaId,
43 ) -> SchemaResult<SchemaVersion>;
44
45 async fn get_versions(&self, subject: &Subject) -> SchemaResult<Vec<u32>>;
47
48 async fn get_subject_version(
50 &self,
51 subject: &Subject,
52 version: SchemaVersion,
53 ) -> SchemaResult<Option<SubjectVersion>>;
54
55 async fn get_latest_version(&self, subject: &Subject) -> SchemaResult<Option<SubjectVersion>>;
57
58 async fn list_subjects(&self) -> SchemaResult<Vec<Subject>>;
60
61 async fn delete_subject(&self, subject: &Subject, permanent: bool) -> SchemaResult<Vec<u32>>;
63
64 async fn delete_version(
66 &self,
67 subject: &Subject,
68 version: SchemaVersion,
69 permanent: bool,
70 ) -> SchemaResult<()>;
71
72 async fn next_schema_id(&self) -> SchemaResult<SchemaId>;
74
75 async fn subject_exists(&self, subject: &Subject) -> SchemaResult<bool>;
77
78 async fn set_version_state(
80 &self,
81 subject: &Subject,
82 version: SchemaVersion,
83 state: VersionState,
84 ) -> SchemaResult<()>;
85
86 async fn get_version_state(
88 &self,
89 subject: &Subject,
90 version: SchemaVersion,
91 ) -> SchemaResult<VersionState>;
92
93 async fn list_deleted_subjects(&self) -> SchemaResult<Vec<Subject>>;
95
96 async fn undelete_subject(&self, subject: &Subject) -> SchemaResult<Vec<u32>>;
98}
99
100pub type Storage = Arc<dyn StorageBackend>;
102
103pub async fn create_storage(config: &crate::config::StorageConfig) -> SchemaResult<Storage> {
105 match config {
106 crate::config::StorageConfig::Memory => Ok(Arc::new(MemoryStorage::new())),
107 #[cfg(feature = "broker")]
108 crate::config::StorageConfig::Broker(broker_config) => {
109 let storage = BrokerStorage::new(broker_config.clone()).await?;
110 Ok(Arc::new(storage))
111 }
112 #[cfg(not(feature = "broker"))]
113 crate::config::StorageConfig::Broker(_) => Err(crate::error::SchemaError::Config(
114 "Broker storage requires the 'broker' feature. Enable it in Cargo.toml.".to_string(),
115 )),
116 crate::config::StorageConfig::Glue { .. } => {
117 Err(crate::error::SchemaError::Config(
120 "AWS Glue is an external registry. Use the `glue` feature with external registry configuration.".to_string()
121 ))
122 }
123 }
124}