chaincodec_registry/
memory.rs1use chaincodec_core::{
7 error::RegistryError,
8 event::EventFingerprint,
9 schema::{Schema, SchemaRegistry},
10};
11use std::{
12 collections::HashMap,
13 path::Path,
14 sync::{Arc, RwLock},
15};
16
17use crate::csdl::CsdlParser;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21struct NameVersion(String, u32);
22
23struct Inner {
24 by_fingerprint: HashMap<String, Schema>,
26 by_name_version: HashMap<NameVersion, Schema>,
28 versions: HashMap<String, Vec<u32>>,
30}
31
32impl Inner {
33 fn new() -> Self {
34 Self {
35 by_fingerprint: HashMap::new(),
36 by_name_version: HashMap::new(),
37 versions: HashMap::new(),
38 }
39 }
40
41 fn insert(&mut self, schema: Schema) {
42 self.by_fingerprint
44 .insert(schema.fingerprint.as_hex().to_string(), schema.clone());
45
46 let versions = self.versions.entry(schema.name.clone()).or_default();
48 if !versions.contains(&schema.version) {
49 versions.push(schema.version);
50 versions.sort_unstable();
51 }
52
53 self.by_name_version
55 .insert(NameVersion(schema.name.clone(), schema.version), schema);
56 }
57
58 fn latest_version(&self, name: &str) -> Option<u32> {
59 self.versions
60 .get(name)?
61 .iter()
62 .rev()
63 .find(|&&v| {
64 self.by_name_version
65 .get(&NameVersion(name.to_string(), v))
66 .map(|s| !s.deprecated)
67 .unwrap_or(false)
68 })
69 .copied()
70 }
71}
72
73#[derive(Clone)]
75pub struct MemoryRegistry {
76 inner: Arc<RwLock<Inner>>,
77}
78
79impl MemoryRegistry {
80 pub fn new() -> Self {
81 Self {
82 inner: Arc::new(RwLock::new(Inner::new())),
83 }
84 }
85
86 pub fn add(&self, schema: Schema) -> Result<(), RegistryError> {
88 let mut inner = self.inner.write().unwrap();
89 let key = NameVersion(schema.name.clone(), schema.version);
90 if inner.by_name_version.contains_key(&key) {
91 return Err(RegistryError::AlreadyExists {
92 name: schema.name.clone(),
93 version: schema.version,
94 });
95 }
96 inner.insert(schema);
97 Ok(())
98 }
99
100 pub fn load_directory(&self, dir: &Path) -> Result<usize, RegistryError> {
105 let mut count = 0;
106 for entry in walkdir_csdl(dir)? {
107 let content = std::fs::read_to_string(&entry).map_err(RegistryError::Io)?;
108 for schema in CsdlParser::parse_all(&content)? {
109 self.add(schema)?;
110 count += 1;
111 }
112 }
113 Ok(count)
114 }
115
116 pub fn load_file(&self, path: &Path) -> Result<usize, RegistryError> {
121 let content = std::fs::read_to_string(path).map_err(RegistryError::Io)?;
122 let schemas = CsdlParser::parse_all(&content)?;
123 if schemas.is_empty() {
124 return Err(RegistryError::ParseError("empty CSDL file".into()));
125 }
126 let count = schemas.len();
127 for schema in schemas {
128 self.add(schema)?;
129 }
130 Ok(count)
131 }
132
133 pub fn insert(&self, schema: Schema) -> Result<(), RegistryError> {
135 self.add(schema)
136 }
137
138 pub fn len(&self) -> usize {
140 self.inner.read().unwrap().by_name_version.len()
141 }
142
143 pub fn is_empty(&self) -> bool {
144 self.len() == 0
145 }
146
147 pub fn all_names(&self) -> Vec<String> {
149 let inner = self.inner.read().unwrap();
150 let mut names: Vec<String> = inner.versions.keys().cloned().collect();
151 names.sort();
152 names
153 }
154
155 pub fn all_schemas(&self) -> Vec<Schema> {
157 let inner = self.inner.read().unwrap();
158 let mut schemas = Vec::new();
159 for name in inner.versions.keys() {
160 if let Some(v) = inner.latest_version(name) {
161 if let Some(s) = inner.by_name_version.get(&NameVersion(name.clone(), v)) {
162 schemas.push(s.clone());
163 }
164 }
165 }
166 schemas
167 }
168}
169
170impl Default for MemoryRegistry {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176impl SchemaRegistry for MemoryRegistry {
177 fn get_by_fingerprint(&self, fp: &EventFingerprint) -> Option<Schema> {
178 self.inner
179 .read()
180 .unwrap()
181 .by_fingerprint
182 .get(fp.as_hex())
183 .cloned()
184 }
185
186 fn get_by_name(&self, name: &str, version: Option<u32>) -> Option<Schema> {
187 let inner = self.inner.read().unwrap();
188 let v = match version {
189 Some(v) => v,
190 None => inner.latest_version(name)?,
191 };
192 inner
193 .by_name_version
194 .get(&NameVersion(name.to_string(), v))
195 .cloned()
196 }
197
198 fn list_for_chain(&self, chain_slug: &str) -> Vec<Schema> {
199 self.inner
200 .read()
201 .unwrap()
202 .by_name_version
203 .values()
204 .filter(|s| s.chains.iter().any(|c| c == chain_slug))
205 .cloned()
206 .collect()
207 }
208
209 fn history(&self, name: &str) -> Vec<Schema> {
210 let inner = self.inner.read().unwrap();
211 let versions = match inner.versions.get(name) {
212 Some(v) => v.clone(),
213 None => return Vec::new(),
214 };
215 versions
216 .iter()
217 .filter_map(|&v| {
218 inner
219 .by_name_version
220 .get(&NameVersion(name.to_string(), v))
221 .cloned()
222 })
223 .collect()
224 }
225}
226
227fn walkdir_csdl(dir: &Path) -> Result<Vec<std::path::PathBuf>, RegistryError> {
229 let mut files = Vec::new();
230 if !dir.is_dir() {
231 return Err(RegistryError::Io(std::io::Error::new(
232 std::io::ErrorKind::NotFound,
233 format!("{} is not a directory", dir.display()),
234 )));
235 }
236 for entry in std::fs::read_dir(dir).map_err(RegistryError::Io)? {
237 let entry = entry.map_err(RegistryError::Io)?;
238 let path = entry.path();
239 if path.is_dir() {
240 files.extend(walkdir_csdl(&path)?);
241 } else if path.extension().map(|e| e == "csdl").unwrap_or(false) {
242 files.push(path);
243 }
244 }
245 Ok(files)
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251
252 fn make_schema(name: &str, version: u32, fingerprint: &str) -> Schema {
253 use chaincodec_core::schema::SchemaMeta;
254 Schema {
255 name: name.to_string(),
256 version,
257 chains: vec!["ethereum".into()],
258 address: None,
259 event: "Transfer".into(),
260 fingerprint: EventFingerprint::new(fingerprint),
261 supersedes: None,
262 superseded_by: None,
263 deprecated: false,
264 fields: vec![],
265 meta: SchemaMeta::default(),
266 }
267 }
268
269 #[test]
270 fn add_and_lookup() {
271 let reg = MemoryRegistry::new();
272 let schema = make_schema(
273 "ERC20Transfer",
274 1,
275 "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
276 );
277 reg.add(schema.clone()).unwrap();
278
279 let found = reg.get_by_fingerprint(&EventFingerprint::new(
280 "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
281 ));
282 assert!(found.is_some());
283 assert_eq!(found.unwrap().name, "ERC20Transfer");
284 }
285
286 #[test]
287 fn duplicate_rejected() {
288 let reg = MemoryRegistry::new();
289 let s = make_schema("ERC20Transfer", 1, "0xabc");
290 reg.add(s.clone()).unwrap();
291 let err = reg.add(s);
292 assert!(err.is_err());
293 }
294
295 #[test]
296 fn latest_version_skips_deprecated() {
297 let reg = MemoryRegistry::new();
298 let mut s1 = make_schema("Foo", 1, "0x01");
299 let s2 = make_schema("Foo", 2, "0x02");
300 s1.deprecated = true;
301 reg.add(s1).unwrap();
302 reg.add(s2).unwrap();
303
304 let found = reg.get_by_name("Foo", None).unwrap();
305 assert_eq!(found.version, 2);
306 }
307}