Skip to main content

rivven_schema/storage/
memory.rs

1//! In-memory storage backend for testing and development
2
3use super::StorageBackend;
4use crate::error::{SchemaError, SchemaResult};
5use crate::types::{
6    Schema, SchemaId, SchemaType, SchemaVersion, Subject, SubjectVersion, VersionState,
7};
8use async_trait::async_trait;
9use dashmap::DashMap;
10use std::sync::atomic::{AtomicU32, Ordering};
11
12/// In-memory storage backend
13pub struct MemoryStorage {
14    /// Schema ID counter
15    next_id: AtomicU32,
16    /// Schemas by ID
17    schemas: DashMap<u32, Schema>,
18    /// Schema fingerprint -> ID mapping (for deduplication)
19    fingerprints: DashMap<String, u32>,
20    /// Subject -> versions mapping
21    subjects: DashMap<String, Vec<SubjectVersionEntry>>,
22    /// Deleted subjects (soft delete) - used for undelete feature
23    deleted_subjects: DashMap<String, Vec<SubjectVersionEntry>>,
24}
25
26#[derive(Clone)]
27struct SubjectVersionEntry {
28    version: u32,
29    schema_id: u32,
30    schema_type: SchemaType,
31    deleted: bool,
32    state: VersionState,
33}
34
35impl MemoryStorage {
36    pub fn new() -> Self {
37        Self {
38            next_id: AtomicU32::new(1),
39            schemas: DashMap::new(),
40            fingerprints: DashMap::new(),
41            subjects: DashMap::new(),
42            deleted_subjects: DashMap::new(),
43        }
44    }
45}
46
47impl Default for MemoryStorage {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53#[async_trait]
54impl StorageBackend for MemoryStorage {
55    async fn store_schema(&self, schema: Schema) -> SchemaResult<SchemaId> {
56        let id = schema.id.0;
57        self.schemas.insert(id, schema.clone());
58
59        if let Some(ref fp) = schema.fingerprint {
60            self.fingerprints.insert(fp.clone(), id);
61        }
62
63        Ok(schema.id)
64    }
65
66    async fn get_schema(&self, id: SchemaId) -> SchemaResult<Option<Schema>> {
67        Ok(self.schemas.get(&id.0).map(|s| s.clone()))
68    }
69
70    async fn get_schema_by_fingerprint(&self, fingerprint: &str) -> SchemaResult<Option<Schema>> {
71        if let Some(id) = self.fingerprints.get(fingerprint) {
72            self.get_schema(SchemaId::new(*id)).await
73        } else {
74            Ok(None)
75        }
76    }
77
78    async fn register_subject_version(
79        &self,
80        subject: &Subject,
81        schema_id: SchemaId,
82    ) -> SchemaResult<SchemaVersion> {
83        let schema = self
84            .get_schema(schema_id)
85            .await?
86            .ok_or_else(|| SchemaError::NotFound(format!("Schema ID {}", schema_id)))?;
87
88        let mut entry = self.subjects.entry(subject.0.clone()).or_default();
89        let version = entry.len() as u32 + 1;
90
91        entry.push(SubjectVersionEntry {
92            version,
93            schema_id: schema_id.0,
94            schema_type: schema.schema_type,
95            deleted: false,
96            state: VersionState::Enabled,
97        });
98
99        Ok(SchemaVersion::new(version))
100    }
101
102    async fn get_versions(&self, subject: &Subject) -> SchemaResult<Vec<u32>> {
103        let entry = self.subjects.get(&subject.0);
104        match entry {
105            Some(versions) => Ok(versions
106                .iter()
107                .filter(|v| !v.deleted)
108                .map(|v| v.version)
109                .collect()),
110            None => Ok(Vec::new()),
111        }
112    }
113
114    async fn get_subject_version(
115        &self,
116        subject: &Subject,
117        version: SchemaVersion,
118    ) -> SchemaResult<Option<SubjectVersion>> {
119        let entry = self.subjects.get(&subject.0);
120        match entry {
121            Some(versions) => {
122                let v = versions
123                    .iter()
124                    .find(|v| v.version == version.0 && !v.deleted);
125
126                match v {
127                    Some(entry) => {
128                        let schema = self
129                            .get_schema(SchemaId::new(entry.schema_id))
130                            .await?
131                            .ok_or_else(|| {
132                                SchemaError::NotFound(format!("Schema ID {}", entry.schema_id))
133                            })?;
134
135                        Ok(Some(SubjectVersion {
136                            subject: subject.clone(),
137                            version: SchemaVersion::new(entry.version),
138                            id: SchemaId::new(entry.schema_id),
139                            schema_type: entry.schema_type,
140                            schema: schema.schema,
141                            state: entry.state,
142                        }))
143                    }
144                    None => Ok(None),
145                }
146            }
147            None => Ok(None),
148        }
149    }
150
151    async fn get_latest_version(&self, subject: &Subject) -> SchemaResult<Option<SubjectVersion>> {
152        let entry = self.subjects.get(&subject.0);
153        match entry {
154            Some(versions) => {
155                let latest = versions.iter().rev().find(|v| !v.deleted);
156
157                match latest {
158                    Some(entry) => {
159                        let schema = self
160                            .get_schema(SchemaId::new(entry.schema_id))
161                            .await?
162                            .ok_or_else(|| {
163                                SchemaError::NotFound(format!("Schema ID {}", entry.schema_id))
164                            })?;
165
166                        Ok(Some(SubjectVersion {
167                            subject: subject.clone(),
168                            version: SchemaVersion::new(entry.version),
169                            id: SchemaId::new(entry.schema_id),
170                            schema_type: entry.schema_type,
171                            schema: schema.schema,
172                            state: entry.state,
173                        }))
174                    }
175                    None => Ok(None),
176                }
177            }
178            None => Ok(None),
179        }
180    }
181
182    async fn list_subjects(&self) -> SchemaResult<Vec<Subject>> {
183        Ok(self
184            .subjects
185            .iter()
186            .filter(|e| e.iter().any(|v| !v.deleted))
187            .map(|e| Subject::new(e.key().clone()))
188            .collect())
189    }
190
191    async fn delete_subject(&self, subject: &Subject, permanent: bool) -> SchemaResult<Vec<u32>> {
192        if permanent {
193            // Remove from deleted_subjects too if it was soft-deleted before
194            self.deleted_subjects.remove(&subject.0);
195            if let Some((_, versions)) = self.subjects.remove(&subject.0) {
196                Ok(versions.iter().map(|v| v.version).collect())
197            } else {
198                Ok(Vec::new())
199            }
200        } else {
201            // Soft delete - move to deleted_subjects for recovery
202            if let Some((key, versions)) = self.subjects.remove(&subject.0) {
203                let version_nums: Vec<u32> = versions.iter().map(|v| v.version).collect();
204                // Store in deleted_subjects for potential recovery
205                self.deleted_subjects.insert(key, versions);
206                Ok(version_nums)
207            } else {
208                Ok(Vec::new())
209            }
210        }
211    }
212
213    async fn delete_version(
214        &self,
215        subject: &Subject,
216        version: SchemaVersion,
217        permanent: bool,
218    ) -> SchemaResult<()> {
219        if let Some(mut entry) = self.subjects.get_mut(&subject.0) {
220            if permanent {
221                entry.retain(|v| v.version != version.0);
222            } else if let Some(v) = entry.iter_mut().find(|v| v.version == version.0) {
223                v.deleted = true;
224            }
225        }
226        Ok(())
227    }
228
229    async fn next_schema_id(&self) -> SchemaResult<SchemaId> {
230        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
231        Ok(SchemaId::new(id))
232    }
233
234    async fn subject_exists(&self, subject: &Subject) -> SchemaResult<bool> {
235        Ok(self.subjects.contains_key(&subject.0))
236    }
237
238    async fn set_version_state(
239        &self,
240        subject: &Subject,
241        version: SchemaVersion,
242        state: VersionState,
243    ) -> SchemaResult<()> {
244        if let Some(mut entry) = self.subjects.get_mut(&subject.0) {
245            if let Some(v) = entry
246                .iter_mut()
247                .find(|v| v.version == version.0 && !v.deleted)
248            {
249                v.state = state;
250                return Ok(());
251            }
252        }
253        Err(SchemaError::NotFound(format!(
254            "Version {} not found for subject {}",
255            version, subject
256        )))
257    }
258
259    async fn get_version_state(
260        &self,
261        subject: &Subject,
262        version: SchemaVersion,
263    ) -> SchemaResult<VersionState> {
264        if let Some(entry) = self.subjects.get(&subject.0) {
265            if let Some(v) = entry.iter().find(|v| v.version == version.0 && !v.deleted) {
266                return Ok(v.state);
267            }
268        }
269        Err(SchemaError::NotFound(format!(
270            "Version {} not found for subject {}",
271            version, subject
272        )))
273    }
274
275    async fn list_deleted_subjects(&self) -> SchemaResult<Vec<Subject>> {
276        Ok(self
277            .deleted_subjects
278            .iter()
279            .map(|e| Subject::new(e.key().clone()))
280            .collect())
281    }
282
283    async fn undelete_subject(&self, subject: &Subject) -> SchemaResult<Vec<u32>> {
284        if let Some((key, versions)) = self.deleted_subjects.remove(&subject.0) {
285            let version_nums: Vec<u32> = versions.iter().map(|v| v.version).collect();
286            // Restore to active subjects
287            self.subjects.insert(key, versions);
288            Ok(version_nums)
289        } else {
290            Err(SchemaError::NotFound(format!(
291                "Subject '{}' not found in deleted subjects",
292                subject
293            )))
294        }
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[tokio::test]
303    async fn test_memory_storage_basic() {
304        let storage = MemoryStorage::new();
305
306        // Store a schema
307        let id = storage.next_schema_id().await.unwrap();
308        let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
309        storage.store_schema(schema.clone()).await.unwrap();
310
311        // Retrieve it
312        let retrieved = storage.get_schema(id).await.unwrap().unwrap();
313        assert_eq!(retrieved.schema, schema.schema);
314    }
315
316    #[tokio::test]
317    async fn test_memory_storage_subject_versions() {
318        let storage = MemoryStorage::new();
319        let subject = Subject::new("test-subject");
320
321        // Store schema v1
322        let id1 = storage.next_schema_id().await.unwrap();
323        let schema1 = Schema::new(id1, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
324        storage.store_schema(schema1).await.unwrap();
325        let v1 = storage
326            .register_subject_version(&subject, id1)
327            .await
328            .unwrap();
329        assert_eq!(v1.0, 1);
330
331        // Store schema v2
332        let id2 = storage.next_schema_id().await.unwrap();
333        let schema2 = Schema::new(id2, SchemaType::Avro, r#"{"type":"int"}"#.to_string());
334        storage.store_schema(schema2).await.unwrap();
335        let v2 = storage
336            .register_subject_version(&subject, id2)
337            .await
338            .unwrap();
339        assert_eq!(v2.0, 2);
340
341        // Check versions
342        let versions = storage.get_versions(&subject).await.unwrap();
343        assert_eq!(versions, vec![1, 2]);
344
345        // Get latest
346        let latest = storage.get_latest_version(&subject).await.unwrap().unwrap();
347        assert_eq!(latest.version.0, 2);
348    }
349
350    #[tokio::test]
351    async fn test_memory_storage_soft_delete() {
352        let storage = MemoryStorage::new();
353        let subject = Subject::new("test-subject");
354
355        // Store and register
356        let id = storage.next_schema_id().await.unwrap();
357        let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
358        storage.store_schema(schema).await.unwrap();
359        storage
360            .register_subject_version(&subject, id)
361            .await
362            .unwrap();
363
364        // Soft delete
365        storage.delete_subject(&subject, false).await.unwrap();
366
367        // Subject should appear deleted
368        let subjects = storage.list_subjects().await.unwrap();
369        assert!(subjects.is_empty());
370
371        // Should be in deleted subjects list
372        let deleted = storage.list_deleted_subjects().await.unwrap();
373        assert_eq!(deleted.len(), 1);
374        assert_eq!(deleted[0].0, "test-subject");
375    }
376
377    #[tokio::test]
378    async fn test_memory_storage_undelete() {
379        let storage = MemoryStorage::new();
380        let subject = Subject::new("test-subject");
381
382        // Store and register
383        let id = storage.next_schema_id().await.unwrap();
384        let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
385        storage.store_schema(schema).await.unwrap();
386        storage
387            .register_subject_version(&subject, id)
388            .await
389            .unwrap();
390
391        // Soft delete
392        let deleted_versions = storage.delete_subject(&subject, false).await.unwrap();
393        assert_eq!(deleted_versions, vec![1]);
394
395        // Verify deleted
396        let subjects = storage.list_subjects().await.unwrap();
397        assert!(subjects.is_empty());
398
399        // Undelete
400        let restored_versions = storage.undelete_subject(&subject).await.unwrap();
401        assert_eq!(restored_versions, vec![1]);
402
403        // Subject should be back
404        let subjects = storage.list_subjects().await.unwrap();
405        assert_eq!(subjects.len(), 1);
406        assert_eq!(subjects[0].0, "test-subject");
407
408        // Deleted list should be empty
409        let deleted = storage.list_deleted_subjects().await.unwrap();
410        assert!(deleted.is_empty());
411
412        // Version should be accessible
413        let version = storage
414            .get_subject_version(&subject, SchemaVersion::new(1))
415            .await
416            .unwrap();
417        assert!(version.is_some());
418    }
419
420    #[tokio::test]
421    async fn test_memory_storage_undelete_not_found() {
422        let storage = MemoryStorage::new();
423        let subject = Subject::new("nonexistent");
424
425        // Try to undelete non-existent subject
426        let result = storage.undelete_subject(&subject).await;
427        assert!(result.is_err());
428    }
429
430    #[tokio::test]
431    async fn test_memory_storage_permanent_delete_prevents_undelete() {
432        let storage = MemoryStorage::new();
433        let subject = Subject::new("test-subject");
434
435        // Store and register
436        let id = storage.next_schema_id().await.unwrap();
437        let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
438        storage.store_schema(schema).await.unwrap();
439        storage
440            .register_subject_version(&subject, id)
441            .await
442            .unwrap();
443
444        // Permanent delete
445        storage.delete_subject(&subject, true).await.unwrap();
446
447        // Should not be in deleted subjects
448        let deleted = storage.list_deleted_subjects().await.unwrap();
449        assert!(deleted.is_empty());
450
451        // Undelete should fail
452        let result = storage.undelete_subject(&subject).await;
453        assert!(result.is_err());
454    }
455}