oversync_core/
table_names.rs1const MAX_SOURCE_IDENTIFIER_LEN: usize = 48;
7const TRUNCATED_HASH_LEN: usize = 12;
8
9#[derive(Debug, Clone)]
11pub struct TableNames {
12 pub snapshot: String,
13 pub cycle_log: String,
14 pub pending_event: String,
15}
16
17impl TableNames {
18 pub fn for_source(source_name: &str) -> Self {
23 let safe = sanitize_name(source_name);
24 Self {
25 snapshot: format!("sync_{safe}_snapshot"),
26 cycle_log: format!("sync_{safe}_cycle_log"),
27 pending_event: format!("sync_{safe}_pending"),
28 }
29 }
30
31 pub fn default_shared() -> Self {
33 Self {
34 snapshot: "snapshot".into(),
35 cycle_log: "cycle_log".into(),
36 pending_event: "pending_event".into(),
37 }
38 }
39
40 pub fn create_ddl(&self) -> String {
42 format!(
43 "DEFINE TABLE IF NOT EXISTS {snap} SCHEMAFULL;\
44 DEFINE FIELD IF NOT EXISTS origin_id ON {snap} TYPE string;\
45 DEFINE FIELD IF NOT EXISTS query_id ON {snap} TYPE string;\
46 DEFINE FIELD IF NOT EXISTS row_key ON {snap} TYPE string;\
47 DEFINE FIELD IF NOT EXISTS row_data ON {snap} TYPE object FLEXIBLE;\
48 DEFINE FIELD IF NOT EXISTS row_hash ON {snap} TYPE string;\
49 DEFINE FIELD IF NOT EXISTS cycle_id ON {snap} TYPE int;\
50 DEFINE FIELD IF NOT EXISTS updated_at ON {snap} TYPE datetime DEFAULT time::now();\
51 DEFINE FIELD IF NOT EXISTS prev_hash ON {snap} TYPE option<string>;\
52 DEFINE INDEX IF NOT EXISTS idx_{snap}_key ON {snap} FIELDS origin_id, query_id, row_key UNIQUE;\
53 DEFINE INDEX IF NOT EXISTS idx_{snap}_cycle ON {snap} FIELDS origin_id, query_id, cycle_id;\
54 DEFINE TABLE IF NOT EXISTS {cl} SCHEMAFULL;\
55 DEFINE FIELD IF NOT EXISTS origin_id ON {cl} TYPE string;\
56 DEFINE FIELD IF NOT EXISTS query_id ON {cl} TYPE string;\
57 DEFINE FIELD IF NOT EXISTS cycle_id ON {cl} TYPE int;\
58 DEFINE FIELD IF NOT EXISTS started_at ON {cl} TYPE datetime;\
59 DEFINE FIELD IF NOT EXISTS finished_at ON {cl} TYPE option<datetime>;\
60 DEFINE FIELD IF NOT EXISTS status ON {cl} TYPE string DEFAULT 'running';\
61 DEFINE FIELD IF NOT EXISTS rows_fetched ON {cl} TYPE int DEFAULT 0;\
62 DEFINE FIELD IF NOT EXISTS rows_created ON {cl} TYPE int DEFAULT 0;\
63 DEFINE FIELD IF NOT EXISTS rows_updated ON {cl} TYPE int DEFAULT 0;\
64 DEFINE FIELD IF NOT EXISTS rows_deleted ON {cl} TYPE int DEFAULT 0;\
65 DEFINE INDEX IF NOT EXISTS idx_{cl}_source ON {cl} FIELDS origin_id, query_id, cycle_id UNIQUE;\
66 DEFINE TABLE IF NOT EXISTS {pe} SCHEMAFULL;\
67 DEFINE FIELD IF NOT EXISTS origin_id ON {pe} TYPE string;\
68 DEFINE FIELD IF NOT EXISTS query_id ON {pe} TYPE string;\
69 DEFINE FIELD IF NOT EXISTS cycle_id ON {pe} TYPE int;\
70 DEFINE FIELD IF NOT EXISTS events_json ON {pe} TYPE string;\
71 DEFINE FIELD IF NOT EXISTS created_at ON {pe} TYPE datetime DEFAULT time::now();\
72 DEFINE INDEX IF NOT EXISTS idx_{pe}_source ON {pe} FIELDS origin_id, query_id;",
73 snap = self.snapshot,
74 cl = self.cycle_log,
75 pe = self.pending_event,
76 )
77 }
78
79 pub fn resolve_sql(&self, template: &str) -> String {
81 template
82 .replace("{snapshot}", &self.snapshot)
83 .replace("{cycle_log}", &self.cycle_log)
84 .replace("{pending_event}", &self.pending_event)
85 }
86}
87
88fn sanitize_name(name: &str) -> String {
89 let mut sanitized = String::with_capacity(name.len());
90 let mut prev_was_underscore = false;
91
92 for c in name.chars() {
93 let mapped = if c.is_ascii_alphanumeric() || c == '_' {
94 c.to_ascii_lowercase()
95 } else {
96 '_'
97 };
98
99 if mapped == '_' {
100 if !prev_was_underscore {
101 sanitized.push('_');
102 }
103 prev_was_underscore = true;
104 } else {
105 sanitized.push(mapped);
106 prev_was_underscore = false;
107 }
108 }
109
110 let sanitized = sanitized.trim_matches('_');
111 if sanitized.is_empty() {
112 return "unnamed".to_string();
113 }
114
115 if sanitized.len() <= MAX_SOURCE_IDENTIFIER_LEN {
116 return sanitized.to_string();
117 }
118
119 let hash = short_hash(sanitized);
120 let prefix_len = MAX_SOURCE_IDENTIFIER_LEN - TRUNCATED_HASH_LEN - 1;
121 let prefix = &sanitized[..prefix_len];
122 format!("{prefix}_{hash}")
123}
124
125fn short_hash(name: &str) -> String {
126 use sha2::{Digest, Sha256};
127
128 let digest = Sha256::digest(name.as_bytes());
129 const_hex::encode(&digest[..TRUNCATED_HASH_LEN / 2])
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135
136 #[test]
137 fn for_source_basic() {
138 let t = TableNames::for_source("pg-prod");
139 assert_eq!(t.snapshot, "sync_pg_prod_snapshot");
140 assert_eq!(t.cycle_log, "sync_pg_prod_cycle_log");
141 assert_eq!(t.pending_event, "sync_pg_prod_pending");
142 }
143
144 #[test]
145 fn for_source_sanitizes_special_chars() {
146 let t = TableNames::for_source("my.trino/analytics");
147 assert_eq!(t.snapshot, "sync_my_trino_analytics_snapshot");
148 }
149
150 #[test]
151 fn for_source_collapses_repeated_underscores() {
152 let t = TableNames::for_source("my...trino///analytics");
153 assert_eq!(t.snapshot, "sync_my_trino_analytics_snapshot");
154 }
155
156 #[test]
157 fn for_source_lowercases() {
158 let t = TableNames::for_source("PG_Prod");
159 assert_eq!(t.snapshot, "sync_pg_prod_snapshot");
160 }
161
162 #[test]
163 fn for_source_uses_fallback_for_empty_name() {
164 let t = TableNames::for_source("!!!");
165 assert_eq!(t.snapshot, "sync_unnamed_snapshot");
166 }
167
168 #[test]
169 fn for_source_truncates_with_hash_suffix() {
170 let long = "a".repeat(200);
171 let t = TableNames::for_source(&long);
172 assert!(
173 t.snapshot
174 .starts_with("sync_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa_")
175 );
176 assert!(t.snapshot.ends_with("_snapshot"));
177 assert!(t.snapshot.len() < 80);
178 }
179
180 #[test]
181 fn for_source_long_names_remain_distinct() {
182 let a = format!("{}-one", "x".repeat(120));
183 let b = format!("{}-two", "x".repeat(120));
184 let a_names = TableNames::for_source(&a);
185 let b_names = TableNames::for_source(&b);
186 assert_ne!(a_names.snapshot, b_names.snapshot);
187 }
188
189 #[test]
190 fn default_shared_backward_compat() {
191 let t = TableNames::default_shared();
192 assert_eq!(t.snapshot, "snapshot");
193 assert_eq!(t.cycle_log, "cycle_log");
194 assert_eq!(t.pending_event, "pending_event");
195 }
196
197 #[test]
198 fn resolve_sql_replaces_all() {
199 let t = TableNames::for_source("pg");
200 let sql = "SELECT * FROM {snapshot} WHERE origin_id = $s; DELETE {pending_event}; UPDATE {cycle_log}";
201 let resolved = t.resolve_sql(sql);
202 assert!(resolved.contains("sync_pg_snapshot"));
203 assert!(resolved.contains("sync_pg_pending"));
204 assert!(resolved.contains("sync_pg_cycle_log"));
205 assert!(!resolved.contains("{snapshot}"));
206 }
207}