sync_engine/schema/
mod.rs1use parking_lot::RwLock;
39
40pub const DEFAULT_TABLE: &str = "sync_items";
42
43#[derive(Debug)]
48pub struct SchemaRegistry {
49 mappings: RwLock<Vec<(String, String)>>,
52
53 bypass: bool,
55}
56
57impl Default for SchemaRegistry {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl SchemaRegistry {
64 #[must_use]
66 pub fn new() -> Self {
67 Self {
68 mappings: RwLock::new(Vec::new()),
69 bypass: false,
70 }
71 }
72
73 #[must_use]
77 pub fn bypass() -> Self {
78 Self {
79 mappings: RwLock::new(Vec::new()),
80 bypass: true,
81 }
82 }
83
84 pub fn register(&self, prefix: &str, table_name: &str) {
93 let mut mappings = self.mappings.write();
94
95 if let Some(pos) = mappings.iter().position(|(p, _)| p == prefix) {
97 mappings[pos].1 = table_name.to_string();
99 } else {
100 mappings.push((prefix.to_string(), table_name.to_string()));
102 }
103
104 mappings.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
106 }
107
108 pub fn unregister(&self, prefix: &str) -> bool {
112 let mut mappings = self.mappings.write();
113 if let Some(pos) = mappings.iter().position(|(p, _)| p == prefix) {
114 mappings.remove(pos);
115 true
116 } else {
117 false
118 }
119 }
120
121 #[must_use]
126 pub fn table_for_key(&self, key: &str) -> &'static str {
127 if self.bypass {
128 return DEFAULT_TABLE;
129 }
130
131 let mappings = self.mappings.read();
132
133 for (prefix, table) in mappings.iter() {
135 if key.starts_with(prefix) {
136 return Box::leak(table.clone().into_boxed_str());
139 }
140 }
141
142 DEFAULT_TABLE
143 }
144
145 #[must_use]
147 pub fn prefixes_for_table(&self, table_name: &str) -> Vec<String> {
148 self.mappings
149 .read()
150 .iter()
151 .filter(|(_, t)| t == table_name)
152 .map(|(p, _)| p.clone())
153 .collect()
154 }
155
156 #[must_use]
158 pub fn tables(&self) -> Vec<String> {
159 let mappings = self.mappings.read();
160 let mut tables: Vec<String> = mappings.iter().map(|(_, t)| t.clone()).collect();
161 tables.sort();
162 tables.dedup();
163 tables
164 }
165
166 #[must_use]
168 pub fn len(&self) -> usize {
169 self.mappings.read().len()
170 }
171
172 #[must_use]
174 pub fn is_empty(&self) -> bool {
175 self.mappings.read().is_empty()
176 }
177
178 #[must_use]
180 pub fn is_bypass(&self) -> bool {
181 self.bypass
182 }
183
184 pub fn clear(&self) {
186 self.mappings.write().clear();
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn test_empty_registry_returns_default() {
196 let registry = SchemaRegistry::new();
197 assert_eq!(registry.table_for_key("any:key"), DEFAULT_TABLE);
198 assert_eq!(registry.table_for_key("view:users:alice"), DEFAULT_TABLE);
199 }
200
201 #[test]
202 fn test_basic_routing() {
203 let registry = SchemaRegistry::new();
204 registry.register("view:users:", "users_items");
205
206 assert_eq!(registry.table_for_key("view:users:alice"), "users_items");
207 assert_eq!(registry.table_for_key("view:users:bob"), "users_items");
208 assert_eq!(registry.table_for_key("view:orders:123"), DEFAULT_TABLE);
209 }
210
211 #[test]
212 fn test_multiple_prefixes_same_table() {
213 let registry = SchemaRegistry::new();
214 registry.register("view:users:", "users_items");
215 registry.register("crdt:users:", "users_items");
216
217 assert_eq!(registry.table_for_key("view:users:alice"), "users_items");
218 assert_eq!(registry.table_for_key("crdt:users:bob"), "users_items");
219 }
220
221 #[test]
222 fn test_longest_prefix_wins() {
223 let registry = SchemaRegistry::new();
224 registry.register("crdt:", "crdt_items");
225 registry.register("crdt:users:", "users_items");
226
227 assert_eq!(registry.table_for_key("crdt:users:alice"), "users_items");
229 assert_eq!(registry.table_for_key("crdt:orders:123"), "crdt_items");
231 }
232
233 #[test]
234 fn test_bypass_mode() {
235 let registry = SchemaRegistry::bypass();
236 registry.register("view:users:", "users_items");
237
238 assert_eq!(registry.table_for_key("view:users:alice"), DEFAULT_TABLE);
240 assert!(registry.is_bypass());
241 }
242
243 #[test]
244 fn test_unregister() {
245 let registry = SchemaRegistry::new();
246 registry.register("view:users:", "users_items");
247
248 assert_eq!(registry.table_for_key("view:users:alice"), "users_items");
249
250 assert!(registry.unregister("view:users:"));
251 assert_eq!(registry.table_for_key("view:users:alice"), DEFAULT_TABLE);
252
253 assert!(!registry.unregister("view:users:"));
255 }
256
257 #[test]
258 fn test_update_existing_prefix() {
259 let registry = SchemaRegistry::new();
260 registry.register("view:users:", "users_items");
261 assert_eq!(registry.table_for_key("view:users:alice"), "users_items");
262
263 registry.register("view:users:", "users_v2_items");
265 assert_eq!(registry.table_for_key("view:users:alice"), "users_v2_items");
266
267 assert_eq!(registry.len(), 1);
269 }
270
271 #[test]
272 fn test_prefixes_for_table() {
273 let registry = SchemaRegistry::new();
274 registry.register("view:users:", "users_items");
275 registry.register("crdt:users:", "users_items");
276 registry.register("view:orders:", "orders_items");
277
278 let prefixes = registry.prefixes_for_table("users_items");
279 assert_eq!(prefixes.len(), 2);
280 assert!(prefixes.contains(&"view:users:".to_string()));
281 assert!(prefixes.contains(&"crdt:users:".to_string()));
282 }
283
284 #[test]
285 fn test_tables() {
286 let registry = SchemaRegistry::new();
287 registry.register("view:users:", "users_items");
288 registry.register("crdt:users:", "users_items");
289 registry.register("view:orders:", "orders_items");
290
291 let tables = registry.tables();
292 assert_eq!(tables.len(), 2);
293 assert!(tables.contains(&"orders_items".to_string()));
294 assert!(tables.contains(&"users_items".to_string()));
295 }
296
297 #[test]
298 fn test_clear() {
299 let registry = SchemaRegistry::new();
300 registry.register("view:users:", "users_items");
301 registry.register("view:orders:", "orders_items");
302
303 assert_eq!(registry.len(), 2);
304 registry.clear();
305 assert!(registry.is_empty());
306 assert_eq!(registry.table_for_key("view:users:alice"), DEFAULT_TABLE);
307 }
308
309 #[test]
310 fn test_order_independence() {
311 let registry = SchemaRegistry::new();
313 registry.register("crdt:users:", "users_items"); registry.register("crdt:", "crdt_items"); assert_eq!(registry.table_for_key("crdt:users:alice"), "users_items");
317 assert_eq!(registry.table_for_key("crdt:orders:123"), "crdt_items");
318
319 let registry2 = SchemaRegistry::new();
321 registry2.register("crdt:", "crdt_items"); registry2.register("crdt:users:", "users_items"); assert_eq!(registry2.table_for_key("crdt:users:alice"), "users_items");
325 assert_eq!(registry2.table_for_key("crdt:orders:123"), "crdt_items");
326 }
327}