sync_engine/schema/
mod.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Schema-based table partitioning for horizontal scalability.
5//!
6//! Routes object IDs to separate SQL tables based on prefix matching.
7//! This enables partitioning large datasets across dedicated tables
8//! while maintaining a unified API surface.
9//!
10//! # Example
11//!
12//! ```rust,no_run
13//! use sync_engine::schema::SchemaRegistry;
14//!
15//! let registry = SchemaRegistry::new();
16//!
17//! // Register prefixes to route to separate tables
18//! registry.register("view:users:", "users_items");
19//! registry.register("crdt:users:", "users_items");
20//! registry.register("view:orders:", "orders_items");
21//!
22//! // Keys are routed by longest prefix match
23//! assert_eq!(registry.table_for_key("view:users:alice"), "users_items");
24//! assert_eq!(registry.table_for_key("crdt:users:bob"), "users_items");
25//! assert_eq!(registry.table_for_key("view:orders:123"), "orders_items");
26//!
27//! // Unknown prefixes fall back to default table
28//! assert_eq!(registry.table_for_key("unknown:key"), "sync_items");
29//! ```
30//!
31//! # Design
32//!
33//! - **Longest prefix match**: More specific prefixes take precedence
34//! - **Default fallback**: Unmatched keys go to `sync_items`
35//! - **SQLite bypass**: Returns `sync_items` for all keys (no benefit from partitioning)
36//! - **Thread-safe**: Uses `parking_lot::RwLock` for concurrent access
37
38use parking_lot::RwLock;
39
40/// Default table name for unmatched keys.
41pub const DEFAULT_TABLE: &str = "sync_items";
42
43/// Registry mapping key prefixes to table names.
44///
45/// Thread-safe for concurrent reads with occasional writes.
46/// Prefixes are matched using longest-prefix-first semantics.
47#[derive(Debug)]
48pub struct SchemaRegistry {
49    /// Prefix -> table name mappings.
50    /// Stored sorted by prefix length (descending) for efficient matching.
51    mappings: RwLock<Vec<(String, String)>>,
52    
53    /// Whether to skip partitioning (e.g., for SQLite).
54    bypass: bool,
55}
56
57impl Default for SchemaRegistry {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl SchemaRegistry {
64    /// Create a new empty registry.
65    #[must_use]
66    pub fn new() -> Self {
67        Self {
68            mappings: RwLock::new(Vec::new()),
69            bypass: false,
70        }
71    }
72    
73    /// Create a registry that bypasses all routing (returns default table).
74    /// 
75    /// Use this for SQLite where table partitioning has no benefit.
76    #[must_use]
77    pub fn bypass() -> Self {
78        Self {
79            mappings: RwLock::new(Vec::new()),
80            bypass: true,
81        }
82    }
83    
84    /// Register a prefix to route to a specific table.
85    ///
86    /// Multiple prefixes can map to the same table.
87    /// Longer prefixes take precedence over shorter ones.
88    ///
89    /// # Arguments
90    /// * `prefix` - Key prefix to match (e.g., "view:users:")
91    /// * `table_name` - Target table name (e.g., "users_items")
92    pub fn register(&self, prefix: &str, table_name: &str) {
93        let mut mappings = self.mappings.write();
94        
95        // Check if this prefix already exists
96        if let Some(pos) = mappings.iter().position(|(p, _)| p == prefix) {
97            // Update existing
98            mappings[pos].1 = table_name.to_string();
99        } else {
100            // Insert new
101            mappings.push((prefix.to_string(), table_name.to_string()));
102        }
103        
104        // Sort by prefix length descending (longest first for matching)
105        mappings.sort_by(|a, b| b.0.len().cmp(&a.0.len()));
106    }
107    
108    /// Unregister a prefix.
109    ///
110    /// Returns `true` if the prefix was found and removed.
111    pub fn unregister(&self, prefix: &str) -> bool {
112        let mut mappings = self.mappings.write();
113        if let Some(pos) = mappings.iter().position(|(p, _)| p == prefix) {
114            mappings.remove(pos);
115            true
116        } else {
117            false
118        }
119    }
120    
121    /// Get the table name for a given key.
122    ///
123    /// Uses longest-prefix-first matching. Returns [`DEFAULT_TABLE`] if no prefix matches
124    /// or if bypass mode is enabled.
125    #[must_use]
126    pub fn table_for_key(&self, key: &str) -> &'static str {
127        if self.bypass {
128            return DEFAULT_TABLE;
129        }
130        
131        let mappings = self.mappings.read();
132        
133        // Mappings are sorted by length descending, so first match is longest
134        for (prefix, table) in mappings.iter() {
135            if key.starts_with(prefix) {
136                // Return static str by leaking - tables are registered once at startup
137                // and persist for the lifetime of the process
138                return Box::leak(table.clone().into_boxed_str());
139            }
140        }
141        
142        DEFAULT_TABLE
143    }
144    
145    /// Get all registered prefixes for a table.
146    #[must_use]
147    pub fn prefixes_for_table(&self, table_name: &str) -> Vec<String> {
148        self.mappings
149            .read()
150            .iter()
151            .filter(|(_, t)| t == table_name)
152            .map(|(p, _)| p.clone())
153            .collect()
154    }
155    
156    /// Get all registered tables (excluding default).
157    #[must_use]
158    pub fn tables(&self) -> Vec<String> {
159        let mappings = self.mappings.read();
160        let mut tables: Vec<String> = mappings.iter().map(|(_, t)| t.clone()).collect();
161        tables.sort();
162        tables.dedup();
163        tables
164    }
165    
166    /// Get the number of registered prefixes.
167    #[must_use]
168    pub fn len(&self) -> usize {
169        self.mappings.read().len()
170    }
171    
172    /// Check if the registry is empty.
173    #[must_use]
174    pub fn is_empty(&self) -> bool {
175        self.mappings.read().is_empty()
176    }
177    
178    /// Check if bypass mode is enabled.
179    #[must_use]
180    pub fn is_bypass(&self) -> bool {
181        self.bypass
182    }
183    
184    /// Clear all registered prefixes.
185    pub fn clear(&self) {
186        self.mappings.write().clear();
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    
194    #[test]
195    fn test_empty_registry_returns_default() {
196        let registry = SchemaRegistry::new();
197        assert_eq!(registry.table_for_key("any:key"), DEFAULT_TABLE);
198        assert_eq!(registry.table_for_key("view:users:alice"), DEFAULT_TABLE);
199    }
200    
201    #[test]
202    fn test_basic_routing() {
203        let registry = SchemaRegistry::new();
204        registry.register("view:users:", "users_items");
205        
206        assert_eq!(registry.table_for_key("view:users:alice"), "users_items");
207        assert_eq!(registry.table_for_key("view:users:bob"), "users_items");
208        assert_eq!(registry.table_for_key("view:orders:123"), DEFAULT_TABLE);
209    }
210    
211    #[test]
212    fn test_multiple_prefixes_same_table() {
213        let registry = SchemaRegistry::new();
214        registry.register("view:users:", "users_items");
215        registry.register("crdt:users:", "users_items");
216        
217        assert_eq!(registry.table_for_key("view:users:alice"), "users_items");
218        assert_eq!(registry.table_for_key("crdt:users:bob"), "users_items");
219    }
220    
221    #[test]
222    fn test_longest_prefix_wins() {
223        let registry = SchemaRegistry::new();
224        registry.register("crdt:", "crdt_items");
225        registry.register("crdt:users:", "users_items");
226        
227        // More specific prefix should win
228        assert_eq!(registry.table_for_key("crdt:users:alice"), "users_items");
229        // Less specific prefix for other keys
230        assert_eq!(registry.table_for_key("crdt:orders:123"), "crdt_items");
231    }
232    
233    #[test]
234    fn test_bypass_mode() {
235        let registry = SchemaRegistry::bypass();
236        registry.register("view:users:", "users_items");
237        
238        // Bypass ignores all registrations
239        assert_eq!(registry.table_for_key("view:users:alice"), DEFAULT_TABLE);
240        assert!(registry.is_bypass());
241    }
242    
243    #[test]
244    fn test_unregister() {
245        let registry = SchemaRegistry::new();
246        registry.register("view:users:", "users_items");
247        
248        assert_eq!(registry.table_for_key("view:users:alice"), "users_items");
249        
250        assert!(registry.unregister("view:users:"));
251        assert_eq!(registry.table_for_key("view:users:alice"), DEFAULT_TABLE);
252        
253        // Unregistering non-existent prefix returns false
254        assert!(!registry.unregister("view:users:"));
255    }
256    
257    #[test]
258    fn test_update_existing_prefix() {
259        let registry = SchemaRegistry::new();
260        registry.register("view:users:", "users_items");
261        assert_eq!(registry.table_for_key("view:users:alice"), "users_items");
262        
263        // Re-register with different table
264        registry.register("view:users:", "users_v2_items");
265        assert_eq!(registry.table_for_key("view:users:alice"), "users_v2_items");
266        
267        // Should only have one entry
268        assert_eq!(registry.len(), 1);
269    }
270    
271    #[test]
272    fn test_prefixes_for_table() {
273        let registry = SchemaRegistry::new();
274        registry.register("view:users:", "users_items");
275        registry.register("crdt:users:", "users_items");
276        registry.register("view:orders:", "orders_items");
277        
278        let prefixes = registry.prefixes_for_table("users_items");
279        assert_eq!(prefixes.len(), 2);
280        assert!(prefixes.contains(&"view:users:".to_string()));
281        assert!(prefixes.contains(&"crdt:users:".to_string()));
282    }
283    
284    #[test]
285    fn test_tables() {
286        let registry = SchemaRegistry::new();
287        registry.register("view:users:", "users_items");
288        registry.register("crdt:users:", "users_items");
289        registry.register("view:orders:", "orders_items");
290        
291        let tables = registry.tables();
292        assert_eq!(tables.len(), 2);
293        assert!(tables.contains(&"orders_items".to_string()));
294        assert!(tables.contains(&"users_items".to_string()));
295    }
296    
297    #[test]
298    fn test_clear() {
299        let registry = SchemaRegistry::new();
300        registry.register("view:users:", "users_items");
301        registry.register("view:orders:", "orders_items");
302        
303        assert_eq!(registry.len(), 2);
304        registry.clear();
305        assert!(registry.is_empty());
306        assert_eq!(registry.table_for_key("view:users:alice"), DEFAULT_TABLE);
307    }
308    
309    #[test]
310    fn test_order_independence() {
311        // Registration order shouldn't affect matching
312        let registry = SchemaRegistry::new();
313        registry.register("crdt:users:", "users_items"); // Register specific first
314        registry.register("crdt:", "crdt_items");        // Then broad
315        
316        assert_eq!(registry.table_for_key("crdt:users:alice"), "users_items");
317        assert_eq!(registry.table_for_key("crdt:orders:123"), "crdt_items");
318        
319        // Try opposite order
320        let registry2 = SchemaRegistry::new();
321        registry2.register("crdt:", "crdt_items");        // Register broad first
322        registry2.register("crdt:users:", "users_items"); // Then specific
323        
324        assert_eq!(registry2.table_for_key("crdt:users:alice"), "users_items");
325        assert_eq!(registry2.table_for_key("crdt:orders:123"), "crdt_items");
326    }
327}