Skip to main content

rivven_schema/storage/
mod.rs

1//! Storage backends for Schema Registry
2//!
3//! This module provides pluggable storage backends for the schema registry:
4//!
5//! - **Memory**: In-memory storage for development and testing
6//! - **Broker**: Durable storage in rivven topics (recommended for production)
7//!
8//! For external schema registries (Confluent, AWS Glue), use the `rivven-connect`
9//! crate's external registry support instead of these storage backends.
10
11mod memory;
12
13#[cfg(feature = "broker")]
14mod broker;
15
16pub use memory::MemoryStorage;
17
18#[cfg(feature = "broker")]
19pub use broker::{BrokerStorage, BrokerStorageStats};
20
21use crate::error::SchemaResult;
22use crate::types::{Schema, SchemaId, SchemaVersion, Subject, SubjectVersion, VersionState};
23use async_trait::async_trait;
24use std::sync::Arc;
25
26/// Storage backend trait for schema persistence
27#[async_trait]
28pub trait StorageBackend: Send + Sync {
29    /// Store a schema and return its ID
30    async fn store_schema(&self, schema: Schema) -> SchemaResult<SchemaId>;
31
32    /// Get schema by ID
33    async fn get_schema(&self, id: SchemaId) -> SchemaResult<Option<Schema>>;
34
35    /// Get schema by fingerprint (for deduplication)
36    async fn get_schema_by_fingerprint(&self, fingerprint: &str) -> SchemaResult<Option<Schema>>;
37
38    /// Register a schema under a subject
39    async fn register_subject_version(
40        &self,
41        subject: &Subject,
42        schema_id: SchemaId,
43    ) -> SchemaResult<SchemaVersion>;
44
45    /// Get all versions for a subject
46    async fn get_versions(&self, subject: &Subject) -> SchemaResult<Vec<u32>>;
47
48    /// Get a specific version of a subject
49    async fn get_subject_version(
50        &self,
51        subject: &Subject,
52        version: SchemaVersion,
53    ) -> SchemaResult<Option<SubjectVersion>>;
54
55    /// Get the latest version of a subject
56    async fn get_latest_version(&self, subject: &Subject) -> SchemaResult<Option<SubjectVersion>>;
57
58    /// List all subjects
59    async fn list_subjects(&self) -> SchemaResult<Vec<Subject>>;
60
61    /// Delete a subject (soft delete by default)
62    async fn delete_subject(&self, subject: &Subject, permanent: bool) -> SchemaResult<Vec<u32>>;
63
64    /// Delete a specific version
65    async fn delete_version(
66        &self,
67        subject: &Subject,
68        version: SchemaVersion,
69        permanent: bool,
70    ) -> SchemaResult<()>;
71
72    /// Get the next schema ID
73    async fn next_schema_id(&self) -> SchemaResult<SchemaId>;
74
75    /// Check if a subject exists
76    async fn subject_exists(&self, subject: &Subject) -> SchemaResult<bool>;
77
78    /// Set the state of a version (enabled/deprecated/disabled)
79    async fn set_version_state(
80        &self,
81        subject: &Subject,
82        version: SchemaVersion,
83        state: VersionState,
84    ) -> SchemaResult<()>;
85
86    /// Get the state of a version
87    async fn get_version_state(
88        &self,
89        subject: &Subject,
90        version: SchemaVersion,
91    ) -> SchemaResult<VersionState>;
92
93    /// List deleted subjects (for recovery)
94    async fn list_deleted_subjects(&self) -> SchemaResult<Vec<Subject>>;
95
96    /// Undelete a soft-deleted subject
97    async fn undelete_subject(&self, subject: &Subject) -> SchemaResult<Vec<u32>>;
98
99    /// Atomically register a schema under a subject in a single write.
100    ///
101    /// Combines ID allocation, schema storage, and subject-version registration
102    /// into one durable write to prevent partial state from crashes between
103    /// separate writes. Default implementation allocates an ID via
104    /// `next_schema_id`, then stores the schema and registers the version
105    /// (suitable for in-memory backends where atomicity is guaranteed).
106    async fn register_schema_atomic(
107        &self,
108        schema: Schema,
109        subject: &Subject,
110    ) -> SchemaResult<(SchemaId, SchemaVersion)> {
111        // Default: allocate ID + non-atomic fallback (3 separate writes)
112        let id = self.next_schema_id().await?;
113        let mut schema_with_id = schema;
114        schema_with_id.id = id;
115        let stored_id = self.store_schema(schema_with_id).await?;
116        let version = self.register_subject_version(subject, stored_id).await?;
117        Ok((stored_id, version))
118    }
119}
120
121/// Type alias for storage backend
122pub type Storage = Arc<dyn StorageBackend>;
123
124/// Create a storage backend from configuration
125pub async fn create_storage(config: &crate::config::StorageConfig) -> SchemaResult<Storage> {
126    match config {
127        crate::config::StorageConfig::Memory => Ok(Arc::new(MemoryStorage::new())),
128        #[cfg(feature = "broker")]
129        crate::config::StorageConfig::Broker(broker_config) => {
130            let storage = BrokerStorage::new(broker_config.clone()).await?;
131            Ok(Arc::new(storage))
132        }
133        #[cfg(not(feature = "broker"))]
134        crate::config::StorageConfig::Broker(_) => Err(crate::error::SchemaError::Config(
135            "Broker storage requires the 'broker' feature. Enable it in Cargo.toml.".to_string(),
136        )),
137        crate::config::StorageConfig::Glue { .. } => {
138            // AWS Glue is an external registry, not a storage backend
139            // Use the glue feature with external registry client instead
140            Err(crate::error::SchemaError::Config(
141                "AWS Glue is an external registry. Use the `glue` feature with external registry configuration.".to_string()
142            ))
143        }
144    }
145}