Skip to main content

rivven_schema/storage/
mod.rs

1//! Storage backends for Schema Registry
2//!
3//! This module provides pluggable storage backends for the schema registry:
4//!
5//! - **Memory**: In-memory storage for development and testing
6//! - **Broker**: Durable storage in rivven topics (recommended for production)
7//!
8//! For external schema registries (Confluent, AWS Glue), use the `rivven-connect`
9//! crate's external registry support instead of these storage backends.
10
11mod memory;
12
13#[cfg(feature = "broker")]
14mod broker;
15
16pub use memory::MemoryStorage;
17
18#[cfg(feature = "broker")]
19pub use broker::{BrokerStorage, BrokerStorageStats};
20
21use crate::error::SchemaResult;
22use crate::types::{Schema, SchemaId, SchemaVersion, Subject, SubjectVersion, VersionState};
23use async_trait::async_trait;
24use std::sync::Arc;
25
26/// Storage backend trait for schema persistence
27#[async_trait]
28pub trait StorageBackend: Send + Sync {
29    /// Store a schema and return its ID
30    async fn store_schema(&self, schema: Schema) -> SchemaResult<SchemaId>;
31
32    /// Get schema by ID
33    async fn get_schema(&self, id: SchemaId) -> SchemaResult<Option<Schema>>;
34
35    /// Get schema by fingerprint (for deduplication)
36    async fn get_schema_by_fingerprint(&self, fingerprint: &str) -> SchemaResult<Option<Schema>>;
37
38    /// Register a schema under a subject
39    async fn register_subject_version(
40        &self,
41        subject: &Subject,
42        schema_id: SchemaId,
43    ) -> SchemaResult<SchemaVersion>;
44
45    /// Get all versions for a subject
46    async fn get_versions(&self, subject: &Subject) -> SchemaResult<Vec<u32>>;
47
48    /// Get a specific version of a subject
49    async fn get_subject_version(
50        &self,
51        subject: &Subject,
52        version: SchemaVersion,
53    ) -> SchemaResult<Option<SubjectVersion>>;
54
55    /// Get the latest version of a subject
56    async fn get_latest_version(&self, subject: &Subject) -> SchemaResult<Option<SubjectVersion>>;
57
58    /// List all subjects
59    async fn list_subjects(&self) -> SchemaResult<Vec<Subject>>;
60
61    /// Delete a subject (soft delete by default)
62    async fn delete_subject(&self, subject: &Subject, permanent: bool) -> SchemaResult<Vec<u32>>;
63
64    /// Delete a specific version
65    async fn delete_version(
66        &self,
67        subject: &Subject,
68        version: SchemaVersion,
69        permanent: bool,
70    ) -> SchemaResult<()>;
71
72    /// Get the next schema ID
73    async fn next_schema_id(&self) -> SchemaResult<SchemaId>;
74
75    /// Check if a subject exists
76    async fn subject_exists(&self, subject: &Subject) -> SchemaResult<bool>;
77
78    /// Set the state of a version (enabled/deprecated/disabled)
79    async fn set_version_state(
80        &self,
81        subject: &Subject,
82        version: SchemaVersion,
83        state: VersionState,
84    ) -> SchemaResult<()>;
85
86    /// Get the state of a version
87    async fn get_version_state(
88        &self,
89        subject: &Subject,
90        version: SchemaVersion,
91    ) -> SchemaResult<VersionState>;
92
93    /// List deleted subjects (for recovery)
94    async fn list_deleted_subjects(&self) -> SchemaResult<Vec<Subject>>;
95
96    /// Undelete a soft-deleted subject
97    async fn undelete_subject(&self, subject: &Subject) -> SchemaResult<Vec<u32>>;
98}
99
100/// Type alias for storage backend
101pub type Storage = Arc<dyn StorageBackend>;
102
103/// Create a storage backend from configuration
104pub async fn create_storage(config: &crate::config::StorageConfig) -> SchemaResult<Storage> {
105    match config {
106        crate::config::StorageConfig::Memory => Ok(Arc::new(MemoryStorage::new())),
107        #[cfg(feature = "broker")]
108        crate::config::StorageConfig::Broker(broker_config) => {
109            let storage = BrokerStorage::new(broker_config.clone()).await?;
110            Ok(Arc::new(storage))
111        }
112        #[cfg(not(feature = "broker"))]
113        crate::config::StorageConfig::Broker(_) => Err(crate::error::SchemaError::Config(
114            "Broker storage requires the 'broker' feature. Enable it in Cargo.toml.".to_string(),
115        )),
116        crate::config::StorageConfig::Glue { .. } => {
117            // AWS Glue is an external registry, not a storage backend
118            // Use the glue feature with external registry client instead
119            Err(crate::error::SchemaError::Config(
120                "AWS Glue is an external registry. Use the `glue` feature with external registry configuration.".to_string()
121            ))
122        }
123    }
124}