pjson_rs/infrastructure/
schema_repository.rs

1//! Schema repository implementation
2//!
3//! Provides thread-safe in-memory storage for schema definitions.
4
5use dashmap::DashMap;
6use std::sync::Arc;
7
8use crate::{
9    ApplicationError, ApplicationResult,
10    domain::value_objects::{Schema, SchemaId},
11};
12
13/// Thread-safe in-memory schema repository
14///
15/// Stores schema definitions with concurrent read/write access using `DashMap`.
16/// Suitable for production use with high-concurrency workloads.
17///
18/// # Design Philosophy
19/// - Lock-free concurrent access using sharded hash maps
20/// - Zero-copy schema retrieval using Arc
21/// - Type-safe schema identifiers
22/// - Simple CRUD operations
23///
24/// # Examples
25/// ```
26/// # use pjson_rs::SchemaRepository;
27/// # use pjson_rs::domain::value_objects::{Schema, SchemaId};
28/// let repo = SchemaRepository::new();
29/// let schema_id = SchemaId::new("user-v1");
30/// let schema = Schema::integer(Some(1), Some(100));
31///
32/// repo.store(schema_id.clone(), schema).unwrap();
33/// let retrieved = repo.get(&schema_id).unwrap();
34/// ```
35pub struct SchemaRepository {
36    schemas: Arc<DashMap<String, Arc<Schema>>>,
37}
38
39impl SchemaRepository {
40    /// Create a new empty schema repository
41    pub fn new() -> Self {
42        Self {
43            schemas: Arc::new(DashMap::new()),
44        }
45    }
46
47    /// Store a schema definition
48    ///
49    /// # Arguments
50    /// * `id` - Unique schema identifier
51    /// * `schema` - Schema definition to store
52    ///
53    /// # Returns
54    /// `Ok(())` if stored successfully
55    ///
56    /// # Errors
57    /// Returns error if schema with same ID already exists (use `update` instead)
58    pub fn store(&self, id: SchemaId, schema: Schema) -> ApplicationResult<()> {
59        let key = id.as_str().to_string();
60
61        if self.schemas.contains_key(&key) {
62            return Err(ApplicationError::Conflict(format!(
63                "Schema with ID '{}' already exists",
64                id
65            )));
66        }
67
68        self.schemas.insert(key, Arc::new(schema));
69        Ok(())
70    }
71
72    /// Update an existing schema definition
73    ///
74    /// # Arguments
75    /// * `id` - Schema identifier
76    /// * `schema` - New schema definition
77    ///
78    /// # Returns
79    /// `Ok(previous_schema)` with the replaced schema if it existed
80    ///
81    /// # Errors
82    /// Returns error if schema does not exist (use `store` instead)
83    pub fn update(&self, id: SchemaId, schema: Schema) -> ApplicationResult<Arc<Schema>> {
84        let key = id.as_str().to_string();
85
86        match self.schemas.insert(key.clone(), Arc::new(schema)) {
87            Some(previous) => Ok(previous),
88            None => Err(ApplicationError::NotFound(format!(
89                "Schema with ID '{}' not found",
90                id
91            ))),
92        }
93    }
94
95    /// Store or update a schema (upsert operation)
96    ///
97    /// # Arguments
98    /// * `id` - Schema identifier
99    /// * `schema` - Schema definition
100    ///
101    /// # Returns
102    /// `Ok(Some(previous))` if schema was updated, `Ok(None)` if newly created
103    pub fn store_or_update(
104        &self,
105        id: SchemaId,
106        schema: Schema,
107    ) -> ApplicationResult<Option<Arc<Schema>>> {
108        let key = id.as_str().to_string();
109        let previous = self.schemas.insert(key, Arc::new(schema));
110        Ok(previous)
111    }
112
113    /// Retrieve a schema by ID
114    ///
115    /// # Arguments
116    /// * `id` - Schema identifier
117    ///
118    /// # Returns
119    /// `Ok(Arc<Schema>)` if schema exists
120    ///
121    /// # Errors
122    /// Returns error if schema not found
123    pub fn get(&self, id: &SchemaId) -> ApplicationResult<Arc<Schema>> {
124        let key = id.as_str();
125
126        self.schemas
127            .get(key)
128            .map(|entry| Arc::clone(entry.value()))
129            .ok_or_else(|| ApplicationError::NotFound(format!("Schema with ID '{}' not found", id)))
130    }
131
132    /// Check if a schema exists
133    ///
134    /// # Arguments
135    /// * `id` - Schema identifier
136    ///
137    /// # Returns
138    /// `true` if schema exists, `false` otherwise
139    pub fn exists(&self, id: &SchemaId) -> bool {
140        self.schemas.contains_key(id.as_str())
141    }
142
143    /// Delete a schema by ID
144    ///
145    /// # Arguments
146    /// * `id` - Schema identifier
147    ///
148    /// # Returns
149    /// `Ok(schema)` with the deleted schema if it existed
150    ///
151    /// # Errors
152    /// Returns error if schema not found
153    pub fn delete(&self, id: &SchemaId) -> ApplicationResult<Arc<Schema>> {
154        let key = id.as_str();
155
156        self.schemas
157            .remove(key)
158            .map(|(_, schema)| schema)
159            .ok_or_else(|| ApplicationError::NotFound(format!("Schema with ID '{}' not found", id)))
160    }
161
162    /// List all schema IDs
163    ///
164    /// # Returns
165    /// Vector of all schema IDs in the repository
166    pub fn list_ids(&self) -> Vec<SchemaId> {
167        self.schemas
168            .iter()
169            .map(|entry| SchemaId::new(entry.key().clone()))
170            .collect()
171    }
172
173    /// Get number of schemas stored
174    ///
175    /// # Returns
176    /// Count of schemas in repository
177    pub fn count(&self) -> usize {
178        self.schemas.len()
179    }
180
181    /// Clear all schemas
182    ///
183    /// Removes all stored schemas from the repository.
184    pub fn clear(&self) {
185        self.schemas.clear();
186    }
187}
188
189impl Default for SchemaRepository {
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195impl Clone for SchemaRepository {
196    fn clone(&self) -> Self {
197        Self {
198            schemas: Arc::clone(&self.schemas),
199        }
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    fn create_test_schema() -> Schema {
208        Schema::integer(Some(0), Some(100))
209    }
210
211    #[test]
212    fn test_store_and_retrieve() {
213        let repo = SchemaRepository::new();
214        let id = SchemaId::new("test-schema");
215        let schema = create_test_schema();
216
217        repo.store(id.clone(), schema).unwrap();
218        assert!(repo.exists(&id));
219
220        let retrieved = repo.get(&id).unwrap();
221        assert!(matches!(*retrieved, Schema::Integer { .. }));
222    }
223
224    #[test]
225    fn test_store_duplicate_fails() {
226        let repo = SchemaRepository::new();
227        let id = SchemaId::new("test-schema");
228        let schema = create_test_schema();
229
230        repo.store(id.clone(), schema.clone()).unwrap();
231        let result = repo.store(id, schema);
232
233        assert!(result.is_err());
234    }
235
236    #[test]
237    fn test_update() {
238        let repo = SchemaRepository::new();
239        let id = SchemaId::new("test-schema");
240        let schema1 = create_test_schema();
241        let schema2 = Schema::string(Some(1), Some(100));
242
243        repo.store(id.clone(), schema1).unwrap();
244        let previous = repo.update(id.clone(), schema2).unwrap();
245
246        assert!(matches!(*previous, Schema::Integer { .. }));
247
248        let current = repo.get(&id).unwrap();
249        assert!(matches!(*current, Schema::String { .. }));
250    }
251
252    #[test]
253    fn test_store_or_update() {
254        let repo = SchemaRepository::new();
255        let id = SchemaId::new("test-schema");
256        let schema1 = create_test_schema();
257        let schema2 = Schema::string(Some(1), Some(100));
258
259        // First insert
260        let result = repo.store_or_update(id.clone(), schema1).unwrap();
261        assert!(result.is_none());
262
263        // Update
264        let result = repo.store_or_update(id.clone(), schema2).unwrap();
265        assert!(result.is_some());
266        assert!(matches!(*result.unwrap(), Schema::Integer { .. }));
267    }
268
269    #[test]
270    fn test_delete() {
271        let repo = SchemaRepository::new();
272        let id = SchemaId::new("test-schema");
273        let schema = create_test_schema();
274
275        repo.store(id.clone(), schema).unwrap();
276        assert!(repo.exists(&id));
277
278        let deleted = repo.delete(&id).unwrap();
279        assert!(matches!(*deleted, Schema::Integer { .. }));
280        assert!(!repo.exists(&id));
281    }
282
283    #[test]
284    fn test_list_ids() {
285        let repo = SchemaRepository::new();
286
287        repo.store(SchemaId::new("schema1"), create_test_schema())
288            .unwrap();
289        repo.store(SchemaId::new("schema2"), create_test_schema())
290            .unwrap();
291
292        let ids = repo.list_ids();
293        assert_eq!(ids.len(), 2);
294        assert!(ids.iter().any(|id| id.as_str() == "schema1"));
295        assert!(ids.iter().any(|id| id.as_str() == "schema2"));
296    }
297
298    #[test]
299    fn test_count_and_clear() {
300        let repo = SchemaRepository::new();
301
302        repo.store(SchemaId::new("schema1"), create_test_schema())
303            .unwrap();
304        repo.store(SchemaId::new("schema2"), create_test_schema())
305            .unwrap();
306
307        assert_eq!(repo.count(), 2);
308
309        repo.clear();
310        assert_eq!(repo.count(), 0);
311    }
312
313    #[test]
314    fn test_concurrent_access() {
315        use std::thread;
316
317        let repo = SchemaRepository::new();
318        let repo_clone = repo.clone();
319
320        let handle = thread::spawn(move || {
321            for i in 0..100 {
322                let id = SchemaId::new(format!("schema-{i}"));
323                repo_clone.store(id, create_test_schema()).ok(); // Some may fail due to duplicates
324            }
325        });
326
327        for i in 0..100 {
328            let id = SchemaId::new(format!("schema-{i}"));
329            repo.store(id, create_test_schema()).ok();
330        }
331
332        handle.join().unwrap();
333
334        // Should have stored all unique schemas
335        assert!(repo.count() <= 100);
336        assert!(repo.count() > 0);
337    }
338}