database_replicator/
config.rs

1// ABOUTME: Parses replication configuration files for table-level rules
2// ABOUTME: Converts TOML format into TableRules structures
3
4use crate::table_rules::{QualifiedTable, TableRules};
5use anyhow::{Context, Result};
6use serde::Deserialize;
7use std::collections::HashMap;
8use std::fs;
9
10#[derive(Debug, Deserialize)]
11struct ReplicationConfig {
12    #[serde(default)]
13    databases: HashMap<String, DatabaseConfig>,
14}
15
16#[derive(Debug, Deserialize, Default)]
17struct DatabaseConfig {
18    #[serde(default)]
19    schema_only: Vec<String>,
20    #[serde(default)]
21    table_filters: Vec<TableFilterConfig>,
22    #[serde(default)]
23    time_filters: Vec<TimeFilterConfig>,
24}
25
26#[derive(Debug, Deserialize)]
27struct TableFilterConfig {
28    table: String,
29    #[serde(default)]
30    schema: Option<String>,
31    #[serde(rename = "where")]
32    predicate: String,
33}
34
35#[derive(Debug, Deserialize)]
36struct TimeFilterConfig {
37    table: String,
38    #[serde(default)]
39    schema: Option<String>,
40    column: String,
41    last: String,
42}
43
44pub fn load_table_rules_from_file(path: &str) -> Result<TableRules> {
45    let raw = fs::read_to_string(path)
46        .with_context(|| format!("Failed to read config file at {}", path))?;
47    let parsed: ReplicationConfig =
48        toml::from_str(&raw).with_context(|| format!("Failed to parse TOML config at {}", path))?;
49
50    let mut rules = TableRules::default();
51    for (db_name, db) in parsed.databases {
52        for table in db.schema_only {
53            let qualified = QualifiedTable::parse(&table)?.with_database(Some(db_name.clone()));
54            rules.add_schema_only_table(qualified)?;
55        }
56        for filter in db.table_filters {
57            // If explicit schema field is provided, use it; otherwise parse from table name
58            let qualified = if let Some(schema) = filter.schema {
59                QualifiedTable::new(Some(db_name.clone()), schema, filter.table)
60            } else {
61                // Parse table name which might be "schema.table" or just "table"
62                QualifiedTable::parse(&filter.table)?.with_database(Some(db_name.clone()))
63            };
64            rules.add_table_filter(qualified, filter.predicate)?;
65        }
66        for filter in db.time_filters {
67            // If explicit schema field is provided, use it; otherwise parse from table name
68            let qualified = if let Some(schema) = filter.schema {
69                QualifiedTable::new(Some(db_name.clone()), schema, filter.table)
70            } else {
71                // Parse table name which might be "schema.table" or just "table"
72                QualifiedTable::parse(&filter.table)?.with_database(Some(db_name.clone()))
73            };
74            rules.add_time_filter(qualified, filter.column, filter.last)?;
75        }
76    }
77
78    Ok(rules)
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    use tempfile::NamedTempFile;
85
86    #[test]
87    fn parse_sample_config() {
88        let mut tmp = NamedTempFile::new().unwrap();
89        let contents = r#"
90            [databases.kong]
91            schema_only = ["evmlog_strides", "price"]
92
93            [[databases.kong.table_filters]]
94            table = "output"
95            where = "series_time >= NOW() - INTERVAL '6 months'"
96
97            [[databases.kong.time_filters]]
98            table = "metrics"
99            column = "created_at"
100            last = "1 year"
101        "#;
102        use std::io::Write;
103        write!(tmp, "{}", contents).unwrap();
104
105        let rules = load_table_rules_from_file(tmp.path().to_str().unwrap()).unwrap();
106        assert_eq!(
107            rules.schema_only_tables("kong"),
108            vec!["\"public\".\"evmlog_strides\"", "\"public\".\"price\""]
109        );
110        assert!(rules.table_filter("kong", "public", "output").is_some());
111        assert!(rules.time_filter("kong", "public", "metrics").is_some());
112    }
113
114    #[test]
115    fn test_toml_with_explicit_schema() {
116        let mut tmp = NamedTempFile::new().unwrap();
117        let contents = r#"
118            [databases.db1]
119
120            [[databases.db1.table_filters]]
121            table = "orders"
122            schema = "analytics"
123            where = "created_at > NOW() - INTERVAL '1 day'"
124
125            [[databases.db1.time_filters]]
126            table = "metrics"
127            schema = "reporting"
128            column = "timestamp"
129            last = "6 months"
130        "#;
131        use std::io::Write;
132        write!(tmp, "{}", contents).unwrap();
133
134        let rules = load_table_rules_from_file(tmp.path().to_str().unwrap()).unwrap();
135        // Should use explicit schema field
136        assert!(rules.table_filter("db1", "analytics", "orders").is_some());
137        assert!(rules.time_filter("db1", "reporting", "metrics").is_some());
138    }
139
140    #[test]
141    fn test_toml_backward_compatibility() {
142        let mut tmp = NamedTempFile::new().unwrap();
143        let contents = r#"
144            [databases.db1]
145
146            [[databases.db1.table_filters]]
147            table = "orders"
148            where = "created_at > NOW() - INTERVAL '1 day'"
149        "#;
150        use std::io::Write;
151        write!(tmp, "{}", contents).unwrap();
152
153        let rules = load_table_rules_from_file(tmp.path().to_str().unwrap()).unwrap();
154        // Should default to public schema when schema field not provided
155        assert!(rules.table_filter("db1", "public", "orders").is_some());
156    }
157
158    #[test]
159    fn test_toml_mixed_notation() {
160        let mut tmp = NamedTempFile::new().unwrap();
161        let contents = r#"
162            [databases.db1]
163            schema_only = ["analytics.large_table", "public.temp"]
164
165            [[databases.db1.table_filters]]
166            table = "events"
167            schema = "analytics"
168            where = "created_at > NOW() - INTERVAL '90 days'"
169
170            [[databases.db1.table_filters]]
171            table = "logs"
172            where = "timestamp > NOW() - INTERVAL '7 days'"
173        "#;
174        use std::io::Write;
175        write!(tmp, "{}", contents).unwrap();
176
177        let rules = load_table_rules_from_file(tmp.path().to_str().unwrap()).unwrap();
178
179        // Check schema_only with dot notation
180        let schema_only = rules.schema_only_tables("db1");
181        assert!(schema_only.contains(&"\"analytics\".\"large_table\"".to_string()));
182        assert!(schema_only.contains(&"\"public\".\"temp\"".to_string()));
183
184        // Check explicit schema field
185        assert!(rules.table_filter("db1", "analytics", "events").is_some());
186
187        // Check default to public when no schema
188        assert!(rules.table_filter("db1", "public", "logs").is_some());
189    }
190}