1use super::StorageBackend;
4use crate::error::{SchemaError, SchemaResult};
5use crate::types::{
6 Schema, SchemaId, SchemaType, SchemaVersion, Subject, SubjectVersion, VersionState,
7};
8use async_trait::async_trait;
9use dashmap::DashMap;
10use std::sync::atomic::{AtomicU32, Ordering};
11
12pub struct MemoryStorage {
14 next_id: AtomicU32,
16 schemas: DashMap<u32, Schema>,
18 fingerprints: DashMap<String, u32>,
20 subjects: DashMap<String, Vec<SubjectVersionEntry>>,
22 deleted_subjects: DashMap<String, Vec<SubjectVersionEntry>>,
24}
25
26#[derive(Clone)]
27struct SubjectVersionEntry {
28 version: u32,
29 schema_id: u32,
30 schema_type: SchemaType,
31 deleted: bool,
32 state: VersionState,
33}
34
35impl MemoryStorage {
36 pub fn new() -> Self {
37 Self {
38 next_id: AtomicU32::new(1),
39 schemas: DashMap::new(),
40 fingerprints: DashMap::new(),
41 subjects: DashMap::new(),
42 deleted_subjects: DashMap::new(),
43 }
44 }
45}
46
47impl Default for MemoryStorage {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53#[async_trait]
54impl StorageBackend for MemoryStorage {
55 async fn store_schema(&self, schema: Schema) -> SchemaResult<SchemaId> {
56 let id = schema.id.0;
57 self.schemas.insert(id, schema.clone());
58
59 if let Some(ref fp) = schema.fingerprint {
60 self.fingerprints.insert(fp.clone(), id);
61 }
62
63 Ok(schema.id)
64 }
65
66 async fn get_schema(&self, id: SchemaId) -> SchemaResult<Option<Schema>> {
67 Ok(self.schemas.get(&id.0).map(|s| s.clone()))
68 }
69
70 async fn get_schema_by_fingerprint(&self, fingerprint: &str) -> SchemaResult<Option<Schema>> {
71 if let Some(id) = self.fingerprints.get(fingerprint) {
72 self.get_schema(SchemaId::new(*id)).await
73 } else {
74 Ok(None)
75 }
76 }
77
78 async fn register_subject_version(
79 &self,
80 subject: &Subject,
81 schema_id: SchemaId,
82 ) -> SchemaResult<SchemaVersion> {
83 let schema = self
84 .get_schema(schema_id)
85 .await?
86 .ok_or_else(|| SchemaError::NotFound(format!("Schema ID {}", schema_id)))?;
87
88 let mut entry = self.subjects.entry(subject.0.clone()).or_default();
89 let version = entry.len() as u32 + 1;
90
91 entry.push(SubjectVersionEntry {
92 version,
93 schema_id: schema_id.0,
94 schema_type: schema.schema_type,
95 deleted: false,
96 state: VersionState::Enabled,
97 });
98
99 Ok(SchemaVersion::new(version))
100 }
101
102 async fn get_versions(&self, subject: &Subject) -> SchemaResult<Vec<u32>> {
103 let entry = self.subjects.get(&subject.0);
104 match entry {
105 Some(versions) => Ok(versions
106 .iter()
107 .filter(|v| !v.deleted)
108 .map(|v| v.version)
109 .collect()),
110 None => Ok(Vec::new()),
111 }
112 }
113
114 async fn get_subject_version(
115 &self,
116 subject: &Subject,
117 version: SchemaVersion,
118 ) -> SchemaResult<Option<SubjectVersion>> {
119 let entry = self.subjects.get(&subject.0);
120 match entry {
121 Some(versions) => {
122 let v = versions
123 .iter()
124 .find(|v| v.version == version.0 && !v.deleted);
125
126 match v {
127 Some(entry) => {
128 let schema = self
129 .get_schema(SchemaId::new(entry.schema_id))
130 .await?
131 .ok_or_else(|| {
132 SchemaError::NotFound(format!("Schema ID {}", entry.schema_id))
133 })?;
134
135 Ok(Some(SubjectVersion {
136 subject: subject.clone(),
137 version: SchemaVersion::new(entry.version),
138 id: SchemaId::new(entry.schema_id),
139 schema_type: entry.schema_type,
140 schema: schema.schema,
141 state: entry.state,
142 }))
143 }
144 None => Ok(None),
145 }
146 }
147 None => Ok(None),
148 }
149 }
150
151 async fn get_latest_version(&self, subject: &Subject) -> SchemaResult<Option<SubjectVersion>> {
152 let entry = self.subjects.get(&subject.0);
153 match entry {
154 Some(versions) => {
155 let latest = versions.iter().rev().find(|v| !v.deleted);
156
157 match latest {
158 Some(entry) => {
159 let schema = self
160 .get_schema(SchemaId::new(entry.schema_id))
161 .await?
162 .ok_or_else(|| {
163 SchemaError::NotFound(format!("Schema ID {}", entry.schema_id))
164 })?;
165
166 Ok(Some(SubjectVersion {
167 subject: subject.clone(),
168 version: SchemaVersion::new(entry.version),
169 id: SchemaId::new(entry.schema_id),
170 schema_type: entry.schema_type,
171 schema: schema.schema,
172 state: entry.state,
173 }))
174 }
175 None => Ok(None),
176 }
177 }
178 None => Ok(None),
179 }
180 }
181
182 async fn list_subjects(&self) -> SchemaResult<Vec<Subject>> {
183 Ok(self
184 .subjects
185 .iter()
186 .filter(|e| e.iter().any(|v| !v.deleted))
187 .map(|e| Subject::new(e.key().clone()))
188 .collect())
189 }
190
191 async fn delete_subject(&self, subject: &Subject, permanent: bool) -> SchemaResult<Vec<u32>> {
192 if permanent {
193 self.deleted_subjects.remove(&subject.0);
195 if let Some((_, versions)) = self.subjects.remove(&subject.0) {
196 Ok(versions.iter().map(|v| v.version).collect())
197 } else {
198 Ok(Vec::new())
199 }
200 } else {
201 if let Some((key, versions)) = self.subjects.remove(&subject.0) {
203 let version_nums: Vec<u32> = versions.iter().map(|v| v.version).collect();
204 self.deleted_subjects.insert(key, versions);
206 Ok(version_nums)
207 } else {
208 Ok(Vec::new())
209 }
210 }
211 }
212
213 async fn delete_version(
214 &self,
215 subject: &Subject,
216 version: SchemaVersion,
217 permanent: bool,
218 ) -> SchemaResult<()> {
219 if let Some(mut entry) = self.subjects.get_mut(&subject.0) {
220 if permanent {
221 entry.retain(|v| v.version != version.0);
222 } else if let Some(v) = entry.iter_mut().find(|v| v.version == version.0) {
223 v.deleted = true;
224 }
225 }
226 Ok(())
227 }
228
229 async fn next_schema_id(&self) -> SchemaResult<SchemaId> {
230 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
231 Ok(SchemaId::new(id))
232 }
233
234 async fn subject_exists(&self, subject: &Subject) -> SchemaResult<bool> {
235 Ok(self.subjects.contains_key(&subject.0))
236 }
237
238 async fn set_version_state(
239 &self,
240 subject: &Subject,
241 version: SchemaVersion,
242 state: VersionState,
243 ) -> SchemaResult<()> {
244 if let Some(mut entry) = self.subjects.get_mut(&subject.0) {
245 if let Some(v) = entry
246 .iter_mut()
247 .find(|v| v.version == version.0 && !v.deleted)
248 {
249 v.state = state;
250 return Ok(());
251 }
252 }
253 Err(SchemaError::NotFound(format!(
254 "Version {} not found for subject {}",
255 version, subject
256 )))
257 }
258
259 async fn get_version_state(
260 &self,
261 subject: &Subject,
262 version: SchemaVersion,
263 ) -> SchemaResult<VersionState> {
264 if let Some(entry) = self.subjects.get(&subject.0) {
265 if let Some(v) = entry.iter().find(|v| v.version == version.0 && !v.deleted) {
266 return Ok(v.state);
267 }
268 }
269 Err(SchemaError::NotFound(format!(
270 "Version {} not found for subject {}",
271 version, subject
272 )))
273 }
274
275 async fn list_deleted_subjects(&self) -> SchemaResult<Vec<Subject>> {
276 Ok(self
277 .deleted_subjects
278 .iter()
279 .map(|e| Subject::new(e.key().clone()))
280 .collect())
281 }
282
283 async fn undelete_subject(&self, subject: &Subject) -> SchemaResult<Vec<u32>> {
284 if let Some((key, versions)) = self.deleted_subjects.remove(&subject.0) {
285 let version_nums: Vec<u32> = versions.iter().map(|v| v.version).collect();
286 self.subjects.insert(key, versions);
288 Ok(version_nums)
289 } else {
290 Err(SchemaError::NotFound(format!(
291 "Subject '{}' not found in deleted subjects",
292 subject
293 )))
294 }
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[tokio::test]
303 async fn test_memory_storage_basic() {
304 let storage = MemoryStorage::new();
305
306 let id = storage.next_schema_id().await.unwrap();
308 let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
309 storage.store_schema(schema.clone()).await.unwrap();
310
311 let retrieved = storage.get_schema(id).await.unwrap().unwrap();
313 assert_eq!(retrieved.schema, schema.schema);
314 }
315
316 #[tokio::test]
317 async fn test_memory_storage_subject_versions() {
318 let storage = MemoryStorage::new();
319 let subject = Subject::new("test-subject");
320
321 let id1 = storage.next_schema_id().await.unwrap();
323 let schema1 = Schema::new(id1, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
324 storage.store_schema(schema1).await.unwrap();
325 let v1 = storage
326 .register_subject_version(&subject, id1)
327 .await
328 .unwrap();
329 assert_eq!(v1.0, 1);
330
331 let id2 = storage.next_schema_id().await.unwrap();
333 let schema2 = Schema::new(id2, SchemaType::Avro, r#"{"type":"int"}"#.to_string());
334 storage.store_schema(schema2).await.unwrap();
335 let v2 = storage
336 .register_subject_version(&subject, id2)
337 .await
338 .unwrap();
339 assert_eq!(v2.0, 2);
340
341 let versions = storage.get_versions(&subject).await.unwrap();
343 assert_eq!(versions, vec![1, 2]);
344
345 let latest = storage.get_latest_version(&subject).await.unwrap().unwrap();
347 assert_eq!(latest.version.0, 2);
348 }
349
350 #[tokio::test]
351 async fn test_memory_storage_soft_delete() {
352 let storage = MemoryStorage::new();
353 let subject = Subject::new("test-subject");
354
355 let id = storage.next_schema_id().await.unwrap();
357 let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
358 storage.store_schema(schema).await.unwrap();
359 storage
360 .register_subject_version(&subject, id)
361 .await
362 .unwrap();
363
364 storage.delete_subject(&subject, false).await.unwrap();
366
367 let subjects = storage.list_subjects().await.unwrap();
369 assert!(subjects.is_empty());
370
371 let deleted = storage.list_deleted_subjects().await.unwrap();
373 assert_eq!(deleted.len(), 1);
374 assert_eq!(deleted[0].0, "test-subject");
375 }
376
377 #[tokio::test]
378 async fn test_memory_storage_undelete() {
379 let storage = MemoryStorage::new();
380 let subject = Subject::new("test-subject");
381
382 let id = storage.next_schema_id().await.unwrap();
384 let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
385 storage.store_schema(schema).await.unwrap();
386 storage
387 .register_subject_version(&subject, id)
388 .await
389 .unwrap();
390
391 let deleted_versions = storage.delete_subject(&subject, false).await.unwrap();
393 assert_eq!(deleted_versions, vec![1]);
394
395 let subjects = storage.list_subjects().await.unwrap();
397 assert!(subjects.is_empty());
398
399 let restored_versions = storage.undelete_subject(&subject).await.unwrap();
401 assert_eq!(restored_versions, vec![1]);
402
403 let subjects = storage.list_subjects().await.unwrap();
405 assert_eq!(subjects.len(), 1);
406 assert_eq!(subjects[0].0, "test-subject");
407
408 let deleted = storage.list_deleted_subjects().await.unwrap();
410 assert!(deleted.is_empty());
411
412 let version = storage
414 .get_subject_version(&subject, SchemaVersion::new(1))
415 .await
416 .unwrap();
417 assert!(version.is_some());
418 }
419
420 #[tokio::test]
421 async fn test_memory_storage_undelete_not_found() {
422 let storage = MemoryStorage::new();
423 let subject = Subject::new("nonexistent");
424
425 let result = storage.undelete_subject(&subject).await;
427 assert!(result.is_err());
428 }
429
430 #[tokio::test]
431 async fn test_memory_storage_permanent_delete_prevents_undelete() {
432 let storage = MemoryStorage::new();
433 let subject = Subject::new("test-subject");
434
435 let id = storage.next_schema_id().await.unwrap();
437 let schema = Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#.to_string());
438 storage.store_schema(schema).await.unwrap();
439 storage
440 .register_subject_version(&subject, id)
441 .await
442 .unwrap();
443
444 storage.delete_subject(&subject, true).await.unwrap();
446
447 let deleted = storage.list_deleted_subjects().await.unwrap();
449 assert!(deleted.is_empty());
450
451 let result = storage.undelete_subject(&subject).await;
453 assert!(result.is_err());
454 }
455}