Skip to main content

chaincodec_registry/
memory.rs

1//! In-memory `SchemaRegistry` implementation.
2//!
3//! Suitable for testing, CLI use, and embedded deployments.
4//! Thread-safe via `Arc<RwLock<Inner>>`.
5
6use chaincodec_core::{
7    error::RegistryError,
8    event::EventFingerprint,
9    schema::{Schema, SchemaRegistry},
10};
11use std::{
12    collections::HashMap,
13    path::Path,
14    sync::{Arc, RwLock},
15};
16
17use crate::csdl::CsdlParser;
18
19/// Key for schema lookup by name + version.
20#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21struct NameVersion(String, u32);
22
23struct Inner {
24    /// Fingerprint → Schema (latest version)
25    by_fingerprint: HashMap<String, Schema>,
26    /// (name, version) → Schema
27    by_name_version: HashMap<NameVersion, Schema>,
28    /// name → sorted list of versions (ascending)
29    versions: HashMap<String, Vec<u32>>,
30}
31
32impl Inner {
33    fn new() -> Self {
34        Self {
35            by_fingerprint: HashMap::new(),
36            by_name_version: HashMap::new(),
37            versions: HashMap::new(),
38        }
39    }
40
41    fn insert(&mut self, schema: Schema) {
42        // Index by fingerprint
43        self.by_fingerprint
44            .insert(schema.fingerprint.as_hex().to_string(), schema.clone());
45
46        // Track version list
47        let versions = self.versions.entry(schema.name.clone()).or_default();
48        if !versions.contains(&schema.version) {
49            versions.push(schema.version);
50            versions.sort_unstable();
51        }
52
53        // Index by name + version
54        self.by_name_version
55            .insert(NameVersion(schema.name.clone(), schema.version), schema);
56    }
57
58    fn latest_version(&self, name: &str) -> Option<u32> {
59        self.versions
60            .get(name)?
61            .iter()
62            .rev()
63            .find(|&&v| {
64                self.by_name_version
65                    .get(&NameVersion(name.to_string(), v))
66                    .map(|s| !s.deprecated)
67                    .unwrap_or(false)
68            })
69            .copied()
70    }
71}
72
73/// Thread-safe in-memory schema registry.
74#[derive(Clone)]
75pub struct MemoryRegistry {
76    inner: Arc<RwLock<Inner>>,
77}
78
79impl MemoryRegistry {
80    pub fn new() -> Self {
81        Self {
82            inner: Arc::new(RwLock::new(Inner::new())),
83        }
84    }
85
86    /// Add a schema to the registry.
87    pub fn add(&self, schema: Schema) -> Result<(), RegistryError> {
88        let mut inner = self.inner.write().unwrap();
89        let key = NameVersion(schema.name.clone(), schema.version);
90        if inner.by_name_version.contains_key(&key) {
91            return Err(RegistryError::AlreadyExists {
92                name: schema.name.clone(),
93                version: schema.version,
94            });
95        }
96        inner.insert(schema);
97        Ok(())
98    }
99
100    /// Load all `.csdl` files from a directory recursively.
101    ///
102    /// Each file may contain multiple schema documents (separated by `---`).
103    /// Returns the total number of schema versions loaded.
104    pub fn load_directory(&self, dir: &Path) -> Result<usize, RegistryError> {
105        let mut count = 0;
106        for entry in walkdir_csdl(dir)? {
107            let content = std::fs::read_to_string(&entry).map_err(RegistryError::Io)?;
108            for schema in CsdlParser::parse_all(&content)? {
109                self.add(schema)?;
110                count += 1;
111            }
112        }
113        Ok(count)
114    }
115
116    /// Load a single CSDL file.
117    ///
118    /// If the file contains multiple schemas (separated by `---`), all are
119    /// added to the registry. Returns the count of schemas loaded.
120    pub fn load_file(&self, path: &Path) -> Result<usize, RegistryError> {
121        let content = std::fs::read_to_string(path).map_err(RegistryError::Io)?;
122        let schemas = CsdlParser::parse_all(&content)?;
123        if schemas.is_empty() {
124            return Err(RegistryError::ParseError("empty CSDL file".into()));
125        }
126        let count = schemas.len();
127        for schema in schemas {
128            self.add(schema)?;
129        }
130        Ok(count)
131    }
132
133    /// Alias for `add()` — adds a schema to the registry.
134    pub fn insert(&self, schema: Schema) -> Result<(), RegistryError> {
135        self.add(schema)
136    }
137
138    /// Returns the total number of schema versions stored.
139    pub fn len(&self) -> usize {
140        self.inner.read().unwrap().by_name_version.len()
141    }
142
143    pub fn is_empty(&self) -> bool {
144        self.len() == 0
145    }
146
147    /// Returns all schema names (deduplicated, one entry per name regardless of versions).
148    pub fn all_names(&self) -> Vec<String> {
149        let inner = self.inner.read().unwrap();
150        let mut names: Vec<String> = inner.versions.keys().cloned().collect();
151        names.sort();
152        names
153    }
154
155    /// Returns all schemas (latest non-deprecated version of each).
156    pub fn all_schemas(&self) -> Vec<Schema> {
157        let inner = self.inner.read().unwrap();
158        let mut schemas = Vec::new();
159        for name in inner.versions.keys() {
160            if let Some(v) = inner.latest_version(name) {
161                if let Some(s) = inner.by_name_version.get(&NameVersion(name.clone(), v)) {
162                    schemas.push(s.clone());
163                }
164            }
165        }
166        schemas
167    }
168}
169
170impl Default for MemoryRegistry {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176impl SchemaRegistry for MemoryRegistry {
177    fn get_by_fingerprint(&self, fp: &EventFingerprint) -> Option<Schema> {
178        self.inner
179            .read()
180            .unwrap()
181            .by_fingerprint
182            .get(fp.as_hex())
183            .cloned()
184    }
185
186    fn get_by_name(&self, name: &str, version: Option<u32>) -> Option<Schema> {
187        let inner = self.inner.read().unwrap();
188        let v = match version {
189            Some(v) => v,
190            None => inner.latest_version(name)?,
191        };
192        inner
193            .by_name_version
194            .get(&NameVersion(name.to_string(), v))
195            .cloned()
196    }
197
198    fn list_for_chain(&self, chain_slug: &str) -> Vec<Schema> {
199        self.inner
200            .read()
201            .unwrap()
202            .by_name_version
203            .values()
204            .filter(|s| s.chains.iter().any(|c| c == chain_slug))
205            .cloned()
206            .collect()
207    }
208
209    fn history(&self, name: &str) -> Vec<Schema> {
210        let inner = self.inner.read().unwrap();
211        let versions = match inner.versions.get(name) {
212            Some(v) => v.clone(),
213            None => return Vec::new(),
214        };
215        versions
216            .iter()
217            .filter_map(|&v| {
218                inner
219                    .by_name_version
220                    .get(&NameVersion(name.to_string(), v))
221                    .cloned()
222            })
223            .collect()
224    }
225}
226
227/// Collect all `.csdl` files under `dir` recursively.
228fn walkdir_csdl(dir: &Path) -> Result<Vec<std::path::PathBuf>, RegistryError> {
229    let mut files = Vec::new();
230    if !dir.is_dir() {
231        return Err(RegistryError::Io(std::io::Error::new(
232            std::io::ErrorKind::NotFound,
233            format!("{} is not a directory", dir.display()),
234        )));
235    }
236    for entry in std::fs::read_dir(dir).map_err(RegistryError::Io)? {
237        let entry = entry.map_err(RegistryError::Io)?;
238        let path = entry.path();
239        if path.is_dir() {
240            files.extend(walkdir_csdl(&path)?);
241        } else if path.extension().map(|e| e == "csdl").unwrap_or(false) {
242            files.push(path);
243        }
244    }
245    Ok(files)
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251
252    fn make_schema(name: &str, version: u32, fingerprint: &str) -> Schema {
253        use chaincodec_core::schema::SchemaMeta;
254        Schema {
255            name: name.to_string(),
256            version,
257            chains: vec!["ethereum".into()],
258            address: None,
259            event: "Transfer".into(),
260            fingerprint: EventFingerprint::new(fingerprint),
261            supersedes: None,
262            superseded_by: None,
263            deprecated: false,
264            fields: vec![],
265            meta: SchemaMeta::default(),
266        }
267    }
268
269    #[test]
270    fn add_and_lookup() {
271        let reg = MemoryRegistry::new();
272        let schema = make_schema(
273            "ERC20Transfer",
274            1,
275            "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
276        );
277        reg.add(schema.clone()).unwrap();
278
279        let found = reg.get_by_fingerprint(&EventFingerprint::new(
280            "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
281        ));
282        assert!(found.is_some());
283        assert_eq!(found.unwrap().name, "ERC20Transfer");
284    }
285
286    #[test]
287    fn duplicate_rejected() {
288        let reg = MemoryRegistry::new();
289        let s = make_schema("ERC20Transfer", 1, "0xabc");
290        reg.add(s.clone()).unwrap();
291        let err = reg.add(s);
292        assert!(err.is_err());
293    }
294
295    #[test]
296    fn latest_version_skips_deprecated() {
297        let reg = MemoryRegistry::new();
298        let mut s1 = make_schema("Foo", 1, "0x01");
299        let s2 = make_schema("Foo", 2, "0x02");
300        s1.deprecated = true;
301        reg.add(s1).unwrap();
302        reg.add(s2).unwrap();
303
304        let found = reg.get_by_name("Foo", None).unwrap();
305        assert_eq!(found.version, 2);
306    }
307}