database_replicator/
config.rs1use crate::table_rules::{QualifiedTable, TableRules};
5use anyhow::{Context, Result};
6use serde::Deserialize;
7use std::collections::HashMap;
8use std::fs;
9
10#[derive(Debug, Deserialize)]
11struct ReplicationConfig {
12 #[serde(default)]
13 databases: HashMap<String, DatabaseConfig>,
14}
15
16#[derive(Debug, Deserialize, Default)]
17struct DatabaseConfig {
18 #[serde(default)]
19 schema_only: Vec<String>,
20 #[serde(default)]
21 table_filters: Vec<TableFilterConfig>,
22 #[serde(default)]
23 time_filters: Vec<TimeFilterConfig>,
24}
25
26#[derive(Debug, Deserialize)]
27struct TableFilterConfig {
28 table: String,
29 #[serde(default)]
30 schema: Option<String>,
31 #[serde(rename = "where")]
32 predicate: String,
33}
34
35#[derive(Debug, Deserialize)]
36struct TimeFilterConfig {
37 table: String,
38 #[serde(default)]
39 schema: Option<String>,
40 column: String,
41 last: String,
42}
43
44pub fn load_table_rules_from_file(path: &str) -> Result<TableRules> {
45 let raw = fs::read_to_string(path)
46 .with_context(|| format!("Failed to read config file at {}", path))?;
47 let parsed: ReplicationConfig =
48 toml::from_str(&raw).with_context(|| format!("Failed to parse TOML config at {}", path))?;
49
50 let mut rules = TableRules::default();
51 for (db_name, db) in parsed.databases {
52 for table in db.schema_only {
53 let qualified = QualifiedTable::parse(&table)?.with_database(Some(db_name.clone()));
54 rules.add_schema_only_table(qualified)?;
55 }
56 for filter in db.table_filters {
57 let qualified = if let Some(schema) = filter.schema {
59 QualifiedTable::new(Some(db_name.clone()), schema, filter.table)
60 } else {
61 QualifiedTable::parse(&filter.table)?.with_database(Some(db_name.clone()))
63 };
64 rules.add_table_filter(qualified, filter.predicate)?;
65 }
66 for filter in db.time_filters {
67 let qualified = if let Some(schema) = filter.schema {
69 QualifiedTable::new(Some(db_name.clone()), schema, filter.table)
70 } else {
71 QualifiedTable::parse(&filter.table)?.with_database(Some(db_name.clone()))
73 };
74 rules.add_time_filter(qualified, filter.column, filter.last)?;
75 }
76 }
77
78 Ok(rules)
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 use tempfile::NamedTempFile;
85
86 #[test]
87 fn parse_sample_config() {
88 let mut tmp = NamedTempFile::new().unwrap();
89 let contents = r#"
90 [databases.kong]
91 schema_only = ["evmlog_strides", "price"]
92
93 [[databases.kong.table_filters]]
94 table = "output"
95 where = "series_time >= NOW() - INTERVAL '6 months'"
96
97 [[databases.kong.time_filters]]
98 table = "metrics"
99 column = "created_at"
100 last = "1 year"
101 "#;
102 use std::io::Write;
103 write!(tmp, "{}", contents).unwrap();
104
105 let rules = load_table_rules_from_file(tmp.path().to_str().unwrap()).unwrap();
106 assert_eq!(
107 rules.schema_only_tables("kong"),
108 vec!["\"public\".\"evmlog_strides\"", "\"public\".\"price\""]
109 );
110 assert!(rules.table_filter("kong", "public", "output").is_some());
111 assert!(rules.time_filter("kong", "public", "metrics").is_some());
112 }
113
114 #[test]
115 fn test_toml_with_explicit_schema() {
116 let mut tmp = NamedTempFile::new().unwrap();
117 let contents = r#"
118 [databases.db1]
119
120 [[databases.db1.table_filters]]
121 table = "orders"
122 schema = "analytics"
123 where = "created_at > NOW() - INTERVAL '1 day'"
124
125 [[databases.db1.time_filters]]
126 table = "metrics"
127 schema = "reporting"
128 column = "timestamp"
129 last = "6 months"
130 "#;
131 use std::io::Write;
132 write!(tmp, "{}", contents).unwrap();
133
134 let rules = load_table_rules_from_file(tmp.path().to_str().unwrap()).unwrap();
135 assert!(rules.table_filter("db1", "analytics", "orders").is_some());
137 assert!(rules.time_filter("db1", "reporting", "metrics").is_some());
138 }
139
140 #[test]
141 fn test_toml_backward_compatibility() {
142 let mut tmp = NamedTempFile::new().unwrap();
143 let contents = r#"
144 [databases.db1]
145
146 [[databases.db1.table_filters]]
147 table = "orders"
148 where = "created_at > NOW() - INTERVAL '1 day'"
149 "#;
150 use std::io::Write;
151 write!(tmp, "{}", contents).unwrap();
152
153 let rules = load_table_rules_from_file(tmp.path().to_str().unwrap()).unwrap();
154 assert!(rules.table_filter("db1", "public", "orders").is_some());
156 }
157
158 #[test]
159 fn test_toml_mixed_notation() {
160 let mut tmp = NamedTempFile::new().unwrap();
161 let contents = r#"
162 [databases.db1]
163 schema_only = ["analytics.large_table", "public.temp"]
164
165 [[databases.db1.table_filters]]
166 table = "events"
167 schema = "analytics"
168 where = "created_at > NOW() - INTERVAL '90 days'"
169
170 [[databases.db1.table_filters]]
171 table = "logs"
172 where = "timestamp > NOW() - INTERVAL '7 days'"
173 "#;
174 use std::io::Write;
175 write!(tmp, "{}", contents).unwrap();
176
177 let rules = load_table_rules_from_file(tmp.path().to_str().unwrap()).unwrap();
178
179 let schema_only = rules.schema_only_tables("db1");
181 assert!(schema_only.contains(&"\"analytics\".\"large_table\"".to_string()));
182 assert!(schema_only.contains(&"\"public\".\"temp\"".to_string()));
183
184 assert!(rules.table_filter("db1", "analytics", "events").is_some());
186
187 assert!(rules.table_filter("db1", "public", "logs").is_some());
189 }
190}