1use super::StorageBackend;
4use crate::error::{SchemaError, SchemaResult};
5use crate::types::{
6 Schema, SchemaId, SchemaType, SchemaVersion, Subject, SubjectVersion, VersionState,
7};
8use async_trait::async_trait;
9use dashmap::DashMap;
10use std::sync::atomic::{AtomicU32, Ordering};
11
12pub struct MemoryStorage {
14 next_id: AtomicU32,
16 schemas: DashMap<u32, Schema>,
18 fingerprints: DashMap<String, u32>,
20 subjects: DashMap<String, Vec<SubjectVersionEntry>>,
22 deleted_subjects: DashMap<String, Vec<SubjectVersionEntry>>,
24}
25
26#[derive(Clone)]
27struct SubjectVersionEntry {
28 version: u32,
29 schema_id: u32,
30 schema_type: SchemaType,
31 deleted: bool,
32 state: VersionState,
33}
34
35impl MemoryStorage {
36 pub fn new() -> Self {
37 Self {
38 next_id: AtomicU32::new(1),
39 schemas: DashMap::new(),
40 fingerprints: DashMap::new(),
41 subjects: DashMap::new(),
42 deleted_subjects: DashMap::new(),
43 }
44 }
45}
46
47impl Default for MemoryStorage {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53#[async_trait]
54impl StorageBackend for MemoryStorage {
55 async fn store_schema(&self, schema: Schema) -> SchemaResult<SchemaId> {
56 let id = schema.id.0;
57 self.schemas.insert(id, schema.clone());
58
59 if let Some(ref fp) = schema.fingerprint {
60 self.fingerprints.insert(fp.clone(), id);
61 }
62
63 Ok(schema.id)
64 }
65
66 async fn get_schema(&self, id: SchemaId) -> SchemaResult<Option<Schema>> {
67 Ok(self.schemas.get(&id.0).map(|s| s.clone()))
68 }
69
70 async fn get_schema_by_fingerprint(&self, fingerprint: &str) -> SchemaResult<Option<Schema>> {
71 if let Some(id) = self.fingerprints.get(fingerprint) {
72 self.get_schema(SchemaId::new(*id)).await
73 } else {
74 Ok(None)
75 }
76 }
77
78 async fn register_subject_version(
79 &self,
80 subject: &Subject,
81 schema_id: SchemaId,
82 ) -> SchemaResult<SchemaVersion> {
83 let schema = self
84 .get_schema(schema_id)
85 .await?
86 .ok_or_else(|| SchemaError::NotFound(format!("Schema ID {}", schema_id)))?;
87
88 let mut entry = self.subjects.entry(subject.0.clone()).or_default();
89 let version = entry.len() as u32 + 1;
90
91 entry.push(SubjectVersionEntry {
92 version,
93 schema_id: schema_id.0,
94 schema_type: schema.schema_type,
95 deleted: false,
96 state: VersionState::Enabled,
97 });
98
99 Ok(SchemaVersion::new(version))
100 }
101
102 async fn get_versions(&self, subject: &Subject) -> SchemaResult<Vec<u32>> {
103 let entry = self.subjects.get(&subject.0);
104 match entry {
105 Some(versions) => Ok(versions
106 .iter()
107 .filter(|v| !v.deleted)
108 .map(|v| v.version)
109 .collect()),
110 None => Ok(Vec::new()),
111 }
112 }
113
114 async fn get_subject_version(
115 &self,
116 subject: &Subject,
117 version: SchemaVersion,
118 ) -> SchemaResult<Option<SubjectVersion>> {
119 let entry = self.subjects.get(&subject.0);
120 match entry {
121 Some(versions) => {
122 let v = versions
123 .iter()
124 .find(|v| v.version == version.0 && !v.deleted);
125
126 match v {
127 Some(entry) => {
128 let schema = self
129 .get_schema(SchemaId::new(entry.schema_id))
130 .await?
131 .ok_or_else(|| {
132 SchemaError::NotFound(format!("Schema ID {}", entry.schema_id))
133 })?;
134
135 Ok(Some(SubjectVersion {
136 subject: subject.clone(),
137 version: SchemaVersion::new(entry.version),
138 id: SchemaId::new(entry.schema_id),
139 schema_type: entry.schema_type,
140 schema: schema.schema,
141 state: entry.state,
142 }))
143 }
144 None => Ok(None),
145 }
146 }
147 None => Ok(None),
148 }
149 }
150
151 async fn get_latest_version(&self, subject: &Subject) -> SchemaResult<Option<SubjectVersion>> {
152 let entry = self.subjects.get(&subject.0);
153 match entry {
154 Some(versions) => {
155 let latest = versions.iter().rev().find(|v| !v.deleted);
156
157 match latest {
158 Some(entry) => {
159 let schema = self
160 .get_schema(SchemaId::new(entry.schema_id))
161 .await?
162 .ok_or_else(|| {
163 SchemaError::NotFound(format!("Schema ID {}", entry.schema_id))
164 })?;
165
166 Ok(Some(SubjectVersion {
167 subject: subject.clone(),
168 version: SchemaVersion::new(entry.version),
169 id: SchemaId::new(entry.schema_id),
170 schema_type: entry.schema_type,
171 schema: schema.schema,
172 state: entry.state,
173 }))
174 }
175 None => Ok(None),
176 }
177 }
178 None => Ok(None),
179 }
180 }
181
182 async fn list_subjects(&self) -> SchemaResult<Vec<Subject>> {
183 Ok(self
184 .subjects
185 .iter()
186 .filter(|e| e.iter().any(|v| !v.deleted))
187 .map(|e| Subject::new(e.key().clone()))
188 .collect())
189 }
190
191 async fn delete_subject(&self, subject: &Subject, permanent: bool) -> SchemaResult<Vec<u32>> {
192 if permanent {
193 self.deleted_subjects.remove(&subject.0);
195 if let Some((_, versions)) = self.subjects.remove(&subject.0) {
196 Ok(versions.iter().map(|v| v.version).collect())
197 } else {
198 Ok(Vec::new())
199 }
200 } else {
201 if let Some((key, versions)) = self.subjects.remove(&subject.0) {
203 let version_nums: Vec<u32> = versions.iter().map(|v| v.version).collect();
204 self.deleted_subjects.insert(key, versions);
206 Ok(version_nums)
207 } else {
208 Ok(Vec::new())
209 }
210 }
211 }
212
213 async fn delete_version(
214 &self,
215 subject: &Subject,
216 version: SchemaVersion,
217 permanent: bool,
218 ) -> SchemaResult<()> {
219 if let Some(mut entry) = self.subjects.get_mut(&subject.0) {
220 if permanent {
221 entry.retain(|v| v.version != version.0);
222 } else if let Some(v) = entry.iter_mut().find(|v| v.version == version.0) {
223 v.deleted = true;
224 }
225 }
226 Ok(())
227 }
228
229 async fn next_schema_id(&self) -> SchemaResult<SchemaId> {
230 let id = self
233 .next_id
234 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |v| v.checked_add(1))
235 .map_err(|_| {
236 SchemaError::Internal("Schema ID space exhausted (u32::MAX reached)".into())
237 })?;
238 Ok(SchemaId::new(id))
239 }
240
241 async fn subject_exists(&self, subject: &Subject) -> SchemaResult<bool> {
242 Ok(self.subjects.contains_key(&subject.0))
243 }
244
245 async fn set_version_state(
246 &self,
247 subject: &Subject,
248 version: SchemaVersion,
249 state: VersionState,
250 ) -> SchemaResult<()> {
251 if let Some(mut entry) = self.subjects.get_mut(&subject.0) {
252 if let Some(v) = entry
253 .iter_mut()
254 .find(|v| v.version == version.0 && !v.deleted)
255 {
256 v.state = state;
257 return Ok(());
258 }
259 }
260 Err(SchemaError::NotFound(format!(
261 "Version {} not found for subject {}",
262 version, subject
263 )))
264 }
265
266 async fn get_version_state(
267 &self,
268 subject: &Subject,
269 version: SchemaVersion,
270 ) -> SchemaResult<VersionState> {
271 if let Some(entry) = self.subjects.get(&subject.0) {
272 if let Some(v) = entry.iter().find(|v| v.version == version.0 && !v.deleted) {
273 return Ok(v.state);
274 }
275 }
276 Err(SchemaError::NotFound(format!(
277 "Version {} not found for subject {}",
278 version, subject
279 )))
280 }
281
282 async fn list_deleted_subjects(&self) -> SchemaResult<Vec<Subject>> {
283 Ok(self
284 .deleted_subjects
285 .iter()
286 .map(|e| Subject::new(e.key().clone()))
287 .collect())
288 }
289
290 async fn undelete_subject(&self, subject: &Subject) -> SchemaResult<Vec<u32>> {
291 if let Some((key, versions)) = self.deleted_subjects.remove(&subject.0) {
292 if self.subjects.contains_key(&key) {
295 self.deleted_subjects.insert(key, versions);
297 return Err(SchemaError::AlreadyExists(format!(
298 "Cannot undelete subject '{}': a subject with the same name already exists",
299 subject
300 )));
301 }
302 let version_nums: Vec<u32> = versions.iter().map(|v| v.version).collect();
303 self.subjects.insert(key, versions);
305 Ok(version_nums)
306 } else {
307 Err(SchemaError::NotFound(format!(
308 "Subject '{}' not found in deleted subjects",
309 subject
310 )))
311 }
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 #[tokio::test]
320 async fn test_memory_storage_basic() {
321 let storage = MemoryStorage::new();
322
323 let id = storage.next_schema_id().await.unwrap();
325 let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
326 storage.store_schema(schema.clone()).await.unwrap();
327
328 let retrieved = storage.get_schema(id).await.unwrap().unwrap();
330 assert_eq!(retrieved.schema, schema.schema);
331 }
332
333 #[tokio::test]
334 async fn test_memory_storage_subject_versions() {
335 let storage = MemoryStorage::new();
336 let subject = Subject::new("test-subject");
337
338 let id1 = storage.next_schema_id().await.unwrap();
340 let schema1 = Schema::new(id1, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
341 storage.store_schema(schema1).await.unwrap();
342 let v1 = storage
343 .register_subject_version(&subject, id1)
344 .await
345 .unwrap();
346 assert_eq!(v1.0, 1);
347
348 let id2 = storage.next_schema_id().await.unwrap();
350 let schema2 = Schema::new(id2, SchemaType::Avro, r#"{"type":"int"}"#.to_string());
351 storage.store_schema(schema2).await.unwrap();
352 let v2 = storage
353 .register_subject_version(&subject, id2)
354 .await
355 .unwrap();
356 assert_eq!(v2.0, 2);
357
358 let versions = storage.get_versions(&subject).await.unwrap();
360 assert_eq!(versions, vec![1, 2]);
361
362 let latest = storage.get_latest_version(&subject).await.unwrap().unwrap();
364 assert_eq!(latest.version.0, 2);
365 }
366
367 #[tokio::test]
368 async fn test_memory_storage_soft_delete() {
369 let storage = MemoryStorage::new();
370 let subject = Subject::new("test-subject");
371
372 let id = storage.next_schema_id().await.unwrap();
374 let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
375 storage.store_schema(schema).await.unwrap();
376 storage
377 .register_subject_version(&subject, id)
378 .await
379 .unwrap();
380
381 storage.delete_subject(&subject, false).await.unwrap();
383
384 let subjects = storage.list_subjects().await.unwrap();
386 assert!(subjects.is_empty());
387
388 let deleted = storage.list_deleted_subjects().await.unwrap();
390 assert_eq!(deleted.len(), 1);
391 assert_eq!(deleted[0].0, "test-subject");
392 }
393
394 #[tokio::test]
395 async fn test_memory_storage_undelete() {
396 let storage = MemoryStorage::new();
397 let subject = Subject::new("test-subject");
398
399 let id = storage.next_schema_id().await.unwrap();
401 let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
402 storage.store_schema(schema).await.unwrap();
403 storage
404 .register_subject_version(&subject, id)
405 .await
406 .unwrap();
407
408 let deleted_versions = storage.delete_subject(&subject, false).await.unwrap();
410 assert_eq!(deleted_versions, vec![1]);
411
412 let subjects = storage.list_subjects().await.unwrap();
414 assert!(subjects.is_empty());
415
416 let restored_versions = storage.undelete_subject(&subject).await.unwrap();
418 assert_eq!(restored_versions, vec![1]);
419
420 let subjects = storage.list_subjects().await.unwrap();
422 assert_eq!(subjects.len(), 1);
423 assert_eq!(subjects[0].0, "test-subject");
424
425 let deleted = storage.list_deleted_subjects().await.unwrap();
427 assert!(deleted.is_empty());
428
429 let version = storage
431 .get_subject_version(&subject, SchemaVersion::new(1))
432 .await
433 .unwrap();
434 assert!(version.is_some());
435 }
436
437 #[tokio::test]
438 async fn test_memory_storage_undelete_not_found() {
439 let storage = MemoryStorage::new();
440 let subject = Subject::new("nonexistent");
441
442 let result = storage.undelete_subject(&subject).await;
444 assert!(result.is_err());
445 }
446
447 #[tokio::test]
448 async fn test_memory_storage_permanent_delete_prevents_undelete() {
449 let storage = MemoryStorage::new();
450 let subject = Subject::new("test-subject");
451
452 let id = storage.next_schema_id().await.unwrap();
454 let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
455 storage.store_schema(schema).await.unwrap();
456 storage
457 .register_subject_version(&subject, id)
458 .await
459 .unwrap();
460
461 storage.delete_subject(&subject, true).await.unwrap();
463
464 let deleted = storage.list_deleted_subjects().await.unwrap();
466 assert!(deleted.is_empty());
467
468 let result = storage.undelete_subject(&subject).await;
470 assert!(result.is_err());
471 }
472}