Skip to main content

rivven_schema/storage/
memory.rs

1//! In-memory storage backend for testing and development
2
3use super::StorageBackend;
4use crate::error::{SchemaError, SchemaResult};
5use crate::types::{
6    Schema, SchemaId, SchemaType, SchemaVersion, Subject, SubjectVersion, VersionState,
7};
8use async_trait::async_trait;
9use dashmap::DashMap;
10use std::sync::atomic::{AtomicU32, Ordering};
11
12/// In-memory storage backend
13pub struct MemoryStorage {
14    /// Schema ID counter
15    next_id: AtomicU32,
16    /// Schemas by ID
17    schemas: DashMap<u32, Schema>,
18    /// Schema fingerprint -> ID mapping (for deduplication)
19    fingerprints: DashMap<String, u32>,
20    /// Subject -> versions mapping
21    subjects: DashMap<String, Vec<SubjectVersionEntry>>,
22    /// Deleted subjects (soft delete) - used for undelete feature
23    deleted_subjects: DashMap<String, Vec<SubjectVersionEntry>>,
24}
25
26#[derive(Clone)]
27struct SubjectVersionEntry {
28    version: u32,
29    schema_id: u32,
30    schema_type: SchemaType,
31    deleted: bool,
32    state: VersionState,
33}
34
35impl MemoryStorage {
36    pub fn new() -> Self {
37        Self {
38            next_id: AtomicU32::new(1),
39            schemas: DashMap::new(),
40            fingerprints: DashMap::new(),
41            subjects: DashMap::new(),
42            deleted_subjects: DashMap::new(),
43        }
44    }
45}
46
47impl Default for MemoryStorage {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53#[async_trait]
54impl StorageBackend for MemoryStorage {
55    async fn store_schema(&self, schema: Schema) -> SchemaResult<SchemaId> {
56        let id = schema.id.0;
57        self.schemas.insert(id, schema.clone());
58
59        if let Some(ref fp) = schema.fingerprint {
60            self.fingerprints.insert(fp.clone(), id);
61        }
62
63        Ok(schema.id)
64    }
65
66    async fn get_schema(&self, id: SchemaId) -> SchemaResult<Option<Schema>> {
67        Ok(self.schemas.get(&id.0).map(|s| s.clone()))
68    }
69
70    async fn get_schema_by_fingerprint(&self, fingerprint: &str) -> SchemaResult<Option<Schema>> {
71        if let Some(id) = self.fingerprints.get(fingerprint) {
72            self.get_schema(SchemaId::new(*id)).await
73        } else {
74            Ok(None)
75        }
76    }
77
78    async fn register_subject_version(
79        &self,
80        subject: &Subject,
81        schema_id: SchemaId,
82    ) -> SchemaResult<SchemaVersion> {
83        let schema = self
84            .get_schema(schema_id)
85            .await?
86            .ok_or_else(|| SchemaError::NotFound(format!("Schema ID {}", schema_id)))?;
87
88        let mut entry = self.subjects.entry(subject.0.clone()).or_default();
89        let version = entry.len() as u32 + 1;
90
91        entry.push(SubjectVersionEntry {
92            version,
93            schema_id: schema_id.0,
94            schema_type: schema.schema_type,
95            deleted: false,
96            state: VersionState::Enabled,
97        });
98
99        Ok(SchemaVersion::new(version))
100    }
101
102    async fn get_versions(&self, subject: &Subject) -> SchemaResult<Vec<u32>> {
103        let entry = self.subjects.get(&subject.0);
104        match entry {
105            Some(versions) => Ok(versions
106                .iter()
107                .filter(|v| !v.deleted)
108                .map(|v| v.version)
109                .collect()),
110            None => Ok(Vec::new()),
111        }
112    }
113
114    async fn get_subject_version(
115        &self,
116        subject: &Subject,
117        version: SchemaVersion,
118    ) -> SchemaResult<Option<SubjectVersion>> {
119        let entry = self.subjects.get(&subject.0);
120        match entry {
121            Some(versions) => {
122                let v = versions
123                    .iter()
124                    .find(|v| v.version == version.0 && !v.deleted);
125
126                match v {
127                    Some(entry) => {
128                        let schema = self
129                            .get_schema(SchemaId::new(entry.schema_id))
130                            .await?
131                            .ok_or_else(|| {
132                                SchemaError::NotFound(format!("Schema ID {}", entry.schema_id))
133                            })?;
134
135                        Ok(Some(SubjectVersion {
136                            subject: subject.clone(),
137                            version: SchemaVersion::new(entry.version),
138                            id: SchemaId::new(entry.schema_id),
139                            schema_type: entry.schema_type,
140                            schema: schema.schema,
141                            state: entry.state,
142                        }))
143                    }
144                    None => Ok(None),
145                }
146            }
147            None => Ok(None),
148        }
149    }
150
151    async fn get_latest_version(&self, subject: &Subject) -> SchemaResult<Option<SubjectVersion>> {
152        let entry = self.subjects.get(&subject.0);
153        match entry {
154            Some(versions) => {
155                let latest = versions.iter().rev().find(|v| !v.deleted);
156
157                match latest {
158                    Some(entry) => {
159                        let schema = self
160                            .get_schema(SchemaId::new(entry.schema_id))
161                            .await?
162                            .ok_or_else(|| {
163                                SchemaError::NotFound(format!("Schema ID {}", entry.schema_id))
164                            })?;
165
166                        Ok(Some(SubjectVersion {
167                            subject: subject.clone(),
168                            version: SchemaVersion::new(entry.version),
169                            id: SchemaId::new(entry.schema_id),
170                            schema_type: entry.schema_type,
171                            schema: schema.schema,
172                            state: entry.state,
173                        }))
174                    }
175                    None => Ok(None),
176                }
177            }
178            None => Ok(None),
179        }
180    }
181
182    async fn list_subjects(&self) -> SchemaResult<Vec<Subject>> {
183        Ok(self
184            .subjects
185            .iter()
186            .filter(|e| e.iter().any(|v| !v.deleted))
187            .map(|e| Subject::new(e.key().clone()))
188            .collect())
189    }
190
191    async fn delete_subject(&self, subject: &Subject, permanent: bool) -> SchemaResult<Vec<u32>> {
192        if permanent {
193            // Remove from deleted_subjects too if it was soft-deleted before
194            self.deleted_subjects.remove(&subject.0);
195            if let Some((_, versions)) = self.subjects.remove(&subject.0) {
196                Ok(versions.iter().map(|v| v.version).collect())
197            } else {
198                Ok(Vec::new())
199            }
200        } else {
201            // Soft delete - move to deleted_subjects for recovery
202            if let Some((key, versions)) = self.subjects.remove(&subject.0) {
203                let version_nums: Vec<u32> = versions.iter().map(|v| v.version).collect();
204                // Store in deleted_subjects for potential recovery
205                self.deleted_subjects.insert(key, versions);
206                Ok(version_nums)
207            } else {
208                Ok(Vec::new())
209            }
210        }
211    }
212
213    async fn delete_version(
214        &self,
215        subject: &Subject,
216        version: SchemaVersion,
217        permanent: bool,
218    ) -> SchemaResult<()> {
219        if let Some(mut entry) = self.subjects.get_mut(&subject.0) {
220            if permanent {
221                entry.retain(|v| v.version != version.0);
222            } else if let Some(v) = entry.iter_mut().find(|v| v.version == version.0) {
223                v.deleted = true;
224            }
225        }
226        Ok(())
227    }
228
229    async fn next_schema_id(&self) -> SchemaResult<SchemaId> {
230        // Use fetch_update with checked_add to detect overflow at u32::MAX
231        // instead of silently wrapping to 0 (which would produce duplicate IDs).
232        let id = self
233            .next_id
234            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| v.checked_add(1))
235            .map_err(|_| {
236                SchemaError::Internal("Schema ID space exhausted (u32::MAX reached)".into())
237            })?;
238        Ok(SchemaId::new(id))
239    }
240
241    async fn subject_exists(&self, subject: &Subject) -> SchemaResult<bool> {
242        Ok(self.subjects.contains_key(&subject.0))
243    }
244
245    async fn set_version_state(
246        &self,
247        subject: &Subject,
248        version: SchemaVersion,
249        state: VersionState,
250    ) -> SchemaResult<()> {
251        if let Some(mut entry) = self.subjects.get_mut(&subject.0) {
252            if let Some(v) = entry
253                .iter_mut()
254                .find(|v| v.version == version.0 && !v.deleted)
255            {
256                v.state = state;
257                return Ok(());
258            }
259        }
260        Err(SchemaError::NotFound(format!(
261            "Version {} not found for subject {}",
262            version, subject
263        )))
264    }
265
266    async fn get_version_state(
267        &self,
268        subject: &Subject,
269        version: SchemaVersion,
270    ) -> SchemaResult<VersionState> {
271        if let Some(entry) = self.subjects.get(&subject.0) {
272            if let Some(v) = entry.iter().find(|v| v.version == version.0 && !v.deleted) {
273                return Ok(v.state);
274            }
275        }
276        Err(SchemaError::NotFound(format!(
277            "Version {} not found for subject {}",
278            version, subject
279        )))
280    }
281
282    async fn list_deleted_subjects(&self) -> SchemaResult<Vec<Subject>> {
283        Ok(self
284            .deleted_subjects
285            .iter()
286            .map(|e| Subject::new(e.key().clone()))
287            .collect())
288    }
289
290    async fn undelete_subject(&self, subject: &Subject) -> SchemaResult<Vec<u32>> {
291        if let Some((key, versions)) = self.deleted_subjects.remove(&subject.0) {
292            // Prevent collision: if a new subject was registered with the same
293            // name after the soft-delete, restoring would overwrite it.
294            if self.subjects.contains_key(&key) {
295                // Put back into deleted_subjects so the entry isn't lost
296                self.deleted_subjects.insert(key, versions);
297                return Err(SchemaError::AlreadyExists(format!(
298                    "Cannot undelete subject '{}': a subject with the same name already exists",
299                    subject
300                )));
301            }
302            let version_nums: Vec<u32> = versions.iter().map(|v| v.version).collect();
303            // Restore to active subjects
304            self.subjects.insert(key, versions);
305            Ok(version_nums)
306        } else {
307            Err(SchemaError::NotFound(format!(
308                "Subject '{}' not found in deleted subjects",
309                subject
310            )))
311        }
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[tokio::test]
320    async fn test_memory_storage_basic() {
321        let storage = MemoryStorage::new();
322
323        // Store a schema
324        let id = storage.next_schema_id().await.unwrap();
325        let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
326        storage.store_schema(schema.clone()).await.unwrap();
327
328        // Retrieve it
329        let retrieved = storage.get_schema(id).await.unwrap().unwrap();
330        assert_eq!(retrieved.schema, schema.schema);
331    }
332
333    #[tokio::test]
334    async fn test_memory_storage_subject_versions() {
335        let storage = MemoryStorage::new();
336        let subject = Subject::new("test-subject");
337
338        // Store schema v1
339        let id1 = storage.next_schema_id().await.unwrap();
340        let schema1 = Schema::new(id1, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
341        storage.store_schema(schema1).await.unwrap();
342        let v1 = storage
343            .register_subject_version(&subject, id1)
344            .await
345            .unwrap();
346        assert_eq!(v1.0, 1);
347
348        // Store schema v2
349        let id2 = storage.next_schema_id().await.unwrap();
350        let schema2 = Schema::new(id2, SchemaType::Avro, r#"{"type":"int"}"#.to_string());
351        storage.store_schema(schema2).await.unwrap();
352        let v2 = storage
353            .register_subject_version(&subject, id2)
354            .await
355            .unwrap();
356        assert_eq!(v2.0, 2);
357
358        // Check versions
359        let versions = storage.get_versions(&subject).await.unwrap();
360        assert_eq!(versions, vec![1, 2]);
361
362        // Get latest
363        let latest = storage.get_latest_version(&subject).await.unwrap().unwrap();
364        assert_eq!(latest.version.0, 2);
365    }
366
367    #[tokio::test]
368    async fn test_memory_storage_soft_delete() {
369        let storage = MemoryStorage::new();
370        let subject = Subject::new("test-subject");
371
372        // Store and register
373        let id = storage.next_schema_id().await.unwrap();
374        let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
375        storage.store_schema(schema).await.unwrap();
376        storage
377            .register_subject_version(&subject, id)
378            .await
379            .unwrap();
380
381        // Soft delete
382        storage.delete_subject(&subject, false).await.unwrap();
383
384        // Subject should appear deleted
385        let subjects = storage.list_subjects().await.unwrap();
386        assert!(subjects.is_empty());
387
388        // Should be in deleted subjects list
389        let deleted = storage.list_deleted_subjects().await.unwrap();
390        assert_eq!(deleted.len(), 1);
391        assert_eq!(deleted[0].0, "test-subject");
392    }
393
394    #[tokio::test]
395    async fn test_memory_storage_undelete() {
396        let storage = MemoryStorage::new();
397        let subject = Subject::new("test-subject");
398
399        // Store and register
400        let id = storage.next_schema_id().await.unwrap();
401        let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
402        storage.store_schema(schema).await.unwrap();
403        storage
404            .register_subject_version(&subject, id)
405            .await
406            .unwrap();
407
408        // Soft delete
409        let deleted_versions = storage.delete_subject(&subject, false).await.unwrap();
410        assert_eq!(deleted_versions, vec![1]);
411
412        // Verify deleted
413        let subjects = storage.list_subjects().await.unwrap();
414        assert!(subjects.is_empty());
415
416        // Undelete
417        let restored_versions = storage.undelete_subject(&subject).await.unwrap();
418        assert_eq!(restored_versions, vec![1]);
419
420        // Subject should be back
421        let subjects = storage.list_subjects().await.unwrap();
422        assert_eq!(subjects.len(), 1);
423        assert_eq!(subjects[0].0, "test-subject");
424
425        // Deleted list should be empty
426        let deleted = storage.list_deleted_subjects().await.unwrap();
427        assert!(deleted.is_empty());
428
429        // Version should be accessible
430        let version = storage
431            .get_subject_version(&subject, SchemaVersion::new(1))
432            .await
433            .unwrap();
434        assert!(version.is_some());
435    }
436
437    #[tokio::test]
438    async fn test_memory_storage_undelete_not_found() {
439        let storage = MemoryStorage::new();
440        let subject = Subject::new("nonexistent");
441
442        // Try to undelete non-existent subject
443        let result = storage.undelete_subject(&subject).await;
444        assert!(result.is_err());
445    }
446
447    #[tokio::test]
448    async fn test_memory_storage_permanent_delete_prevents_undelete() {
449        let storage = MemoryStorage::new();
450        let subject = Subject::new("test-subject");
451
452        // Store and register
453        let id = storage.next_schema_id().await.unwrap();
454        let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
455        storage.store_schema(schema).await.unwrap();
456        storage
457            .register_subject_version(&subject, id)
458            .await
459            .unwrap();
460
461        // Permanent delete
462        storage.delete_subject(&subject, true).await.unwrap();
463
464        // Should not be in deleted subjects
465        let deleted = storage.list_deleted_subjects().await.unwrap();
466        assert!(deleted.is_empty());
467
468        // Undelete should fail
469        let result = storage.undelete_subject(&subject).await;
470        assert!(result.is_err());
471    }
472}