sync_engine/coordinator/
schema_api.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Schema management API for table partitioning.
5//!
6//! Provides methods for registering schemas and routing keys to specific tables.
7
8use tracing::info;
9
10use crate::storage::traits::StorageError;
11
12use super::SyncEngine;
13
14impl SyncEngine {
15    /// Register a schema prefix to route keys to a specific table.
16    ///
17    /// Creates the table if it doesn't exist (MySQL only - SQLite ignores partitioning).
18    /// Multiple prefixes can map to the same table.
19    ///
20    /// # Arguments
21    /// * `schema_name` - Schema name used to derive table name (e.g., "users" → "users_items")
22    /// * `prefix` - Key prefix to route (e.g., "view:users:" or "crdt:users:")
23    ///
24    /// # Example
25    ///
26    /// ```rust,no_run
27    /// # use sync_engine::SyncEngine;
28    /// # async fn example(engine: &mut SyncEngine) {
29    /// // Register view and crdt prefixes for the users schema
30    /// engine.register_schema("users", "view:users:").await.unwrap();
31    /// engine.register_schema("users", "crdt:users:").await.unwrap();
32    ///
33    /// // Now these keys route to users_items table:
34    /// // - view:users:alice
35    /// // - crdt:users:bob
36    /// // - view:users:_schema:users
37    /// # }
38    /// ```
39    pub async fn register_schema(&self, schema_name: &str, prefix: &str) -> Result<(), StorageError> {
40        let table_name = format!("{}_items", schema_name);
41        
42        // Create table if it doesn't exist (MySQL only)
43        if let Some(ref sql_store) = self.sql_store {
44            sql_store.ensure_table(&table_name).await?;
45        }
46        
47        // Register prefix in registry
48        self.schema_registry.register(prefix, &table_name);
49        
50        info!(
51            schema = %schema_name,
52            prefix = %prefix,
53            table = %table_name,
54            "Schema registered"
55        );
56        
57        Ok(())
58    }
59    
60    /// Register multiple prefixes for a schema in one call.
61    ///
62    /// This is a convenience method for registering both view and crdt prefixes.
63    ///
64    /// # Example
65    ///
66    /// ```rust,no_run
67    /// # use sync_engine::SyncEngine;
68    /// # async fn example(engine: &mut SyncEngine) {
69    /// // Register both prefixes at once
70    /// engine.register_schema_prefixes("users", &["view:users:", "crdt:users:"]).await.unwrap();
71    /// # }
72    /// ```
73    pub async fn register_schema_prefixes(&self, schema_name: &str, prefixes: &[&str]) -> Result<(), StorageError> {
74        for prefix in prefixes {
75            self.register_schema(schema_name, prefix).await?;
76        }
77        Ok(())
78    }
79    
80    /// Unregister a prefix (for testing or cleanup).
81    ///
82    /// Note: This does NOT drop the table - it only removes the routing.
83    pub fn unregister_prefix(&self, prefix: &str) -> bool {
84        self.schema_registry.unregister(prefix)
85    }
86    
87    /// Get the table name for a given key.
88    ///
89    /// Returns "sync_items" for unregistered prefixes.
90    #[must_use]
91    pub fn table_for_key(&self, key: &str) -> &'static str {
92        self.schema_registry.table_for_key(key)
93    }
94    
95    /// Get all registered schema tables.
96    #[must_use]
97    pub fn registered_tables(&self) -> Vec<String> {
98        self.schema_registry.tables()
99    }
100    
101    /// Get all prefixes routing to a specific table.
102    #[must_use]
103    pub fn prefixes_for_table(&self, table_name: &str) -> Vec<String> {
104        self.schema_registry.prefixes_for_table(table_name)
105    }
106    
107    /// Check if schema partitioning is in bypass mode (SQLite).
108    #[must_use]
109    pub fn is_schema_bypass(&self) -> bool {
110        self.schema_registry.is_bypass()
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use crate::config::SyncEngineConfig;
118    use tokio::sync::watch;
119
120    fn test_config() -> SyncEngineConfig {
121        SyncEngineConfig {
122            redis_url: None,
123            sql_url: None,
124            wal_path: None,
125            l1_max_bytes: 1024 * 1024,
126            ..Default::default()
127        }
128    }
129
130    #[test]
131    fn test_table_for_key_empty_registry() {
132        let config = test_config();
133        let (_tx, rx) = watch::channel(config.clone());
134        let engine = SyncEngine::new(config, rx);
135        
136        // No schemas registered - everything goes to default
137        assert_eq!(engine.table_for_key("view:users:alice"), "sync_items");
138        assert_eq!(engine.table_for_key("crdt:users:bob"), "sync_items");
139    }
140
141    #[test]
142    fn test_table_for_key_with_registrations() {
143        let config = test_config();
144        let (_tx, rx) = watch::channel(config.clone());
145        let engine = SyncEngine::new(config, rx);
146        
147        // Register prefixes manually (bypassing async ensure_table)
148        engine.schema_registry.register("view:users:", "users_items");
149        engine.schema_registry.register("crdt:users:", "users_items");
150        
151        assert_eq!(engine.table_for_key("view:users:alice"), "users_items");
152        assert_eq!(engine.table_for_key("crdt:users:bob"), "users_items");
153        assert_eq!(engine.table_for_key("unknown:key"), "sync_items");
154    }
155
156    #[test]
157    fn test_registered_tables() {
158        let config = test_config();
159        let (_tx, rx) = watch::channel(config.clone());
160        let engine = SyncEngine::new(config, rx);
161        
162        engine.schema_registry.register("view:users:", "users_items");
163        engine.schema_registry.register("view:orders:", "orders_items");
164        
165        let tables = engine.registered_tables();
166        assert!(tables.contains(&"users_items".to_string()));
167        assert!(tables.contains(&"orders_items".to_string()));
168    }
169
170    #[test]
171    fn test_unregister_prefix() {
172        let config = test_config();
173        let (_tx, rx) = watch::channel(config.clone());
174        let engine = SyncEngine::new(config, rx);
175        
176        engine.schema_registry.register("view:users:", "users_items");
177        assert_eq!(engine.table_for_key("view:users:alice"), "users_items");
178        
179        assert!(engine.unregister_prefix("view:users:"));
180        assert_eq!(engine.table_for_key("view:users:alice"), "sync_items");
181    }
182}