sync_engine/coordinator/
schema_api.rs1use tracing::info;
9
10use crate::storage::traits::StorageError;
11
12use super::SyncEngine;
13
14impl SyncEngine {
15 pub async fn register_schema(&self, schema_name: &str, prefix: &str) -> Result<(), StorageError> {
40 let table_name = format!("{}_items", schema_name);
41
42 if let Some(ref sql_store) = self.sql_store {
44 sql_store.ensure_table(&table_name).await?;
45 }
46
47 self.schema_registry.register(prefix, &table_name);
49
50 info!(
51 schema = %schema_name,
52 prefix = %prefix,
53 table = %table_name,
54 "Schema registered"
55 );
56
57 Ok(())
58 }
59
60 pub async fn register_schema_prefixes(&self, schema_name: &str, prefixes: &[&str]) -> Result<(), StorageError> {
74 for prefix in prefixes {
75 self.register_schema(schema_name, prefix).await?;
76 }
77 Ok(())
78 }
79
80 pub fn unregister_prefix(&self, prefix: &str) -> bool {
84 self.schema_registry.unregister(prefix)
85 }
86
87 #[must_use]
91 pub fn table_for_key(&self, key: &str) -> &'static str {
92 self.schema_registry.table_for_key(key)
93 }
94
95 #[must_use]
97 pub fn registered_tables(&self) -> Vec<String> {
98 self.schema_registry.tables()
99 }
100
101 #[must_use]
103 pub fn prefixes_for_table(&self, table_name: &str) -> Vec<String> {
104 self.schema_registry.prefixes_for_table(table_name)
105 }
106
107 #[must_use]
109 pub fn is_schema_bypass(&self) -> bool {
110 self.schema_registry.is_bypass()
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use crate::config::SyncEngineConfig;
118 use tokio::sync::watch;
119
120 fn test_config() -> SyncEngineConfig {
121 SyncEngineConfig {
122 redis_url: None,
123 sql_url: None,
124 wal_path: None,
125 l1_max_bytes: 1024 * 1024,
126 ..Default::default()
127 }
128 }
129
130 #[test]
131 fn test_table_for_key_empty_registry() {
132 let config = test_config();
133 let (_tx, rx) = watch::channel(config.clone());
134 let engine = SyncEngine::new(config, rx);
135
136 assert_eq!(engine.table_for_key("view:users:alice"), "sync_items");
138 assert_eq!(engine.table_for_key("crdt:users:bob"), "sync_items");
139 }
140
141 #[test]
142 fn test_table_for_key_with_registrations() {
143 let config = test_config();
144 let (_tx, rx) = watch::channel(config.clone());
145 let engine = SyncEngine::new(config, rx);
146
147 engine.schema_registry.register("view:users:", "users_items");
149 engine.schema_registry.register("crdt:users:", "users_items");
150
151 assert_eq!(engine.table_for_key("view:users:alice"), "users_items");
152 assert_eq!(engine.table_for_key("crdt:users:bob"), "users_items");
153 assert_eq!(engine.table_for_key("unknown:key"), "sync_items");
154 }
155
156 #[test]
157 fn test_registered_tables() {
158 let config = test_config();
159 let (_tx, rx) = watch::channel(config.clone());
160 let engine = SyncEngine::new(config, rx);
161
162 engine.schema_registry.register("view:users:", "users_items");
163 engine.schema_registry.register("view:orders:", "orders_items");
164
165 let tables = engine.registered_tables();
166 assert!(tables.contains(&"users_items".to_string()));
167 assert!(tables.contains(&"orders_items".to_string()));
168 }
169
170 #[test]
171 fn test_unregister_prefix() {
172 let config = test_config();
173 let (_tx, rx) = watch::channel(config.clone());
174 let engine = SyncEngine::new(config, rx);
175
176 engine.schema_registry.register("view:users:", "users_items");
177 assert_eq!(engine.table_for_key("view:users:alice"), "users_items");
178
179 assert!(engine.unregister_prefix("view:users:"));
180 assert_eq!(engine.table_for_key("view:users:alice"), "sync_items");
181 }
182}