oversync_core/
table_names.rs1#[derive(Debug, Clone)]
8pub struct TableNames {
9 pub snapshot: String,
10 pub cycle_log: String,
11 pub pending_event: String,
12}
13
14impl TableNames {
15 pub fn for_source(source_name: &str) -> Self {
20 let safe = sanitize_name(source_name);
21 Self {
22 snapshot: format!("sync_{safe}_snapshot"),
23 cycle_log: format!("sync_{safe}_cycle_log"),
24 pending_event: format!("sync_{safe}_pending"),
25 }
26 }
27
28 pub fn default_shared() -> Self {
30 Self {
31 snapshot: "snapshot".into(),
32 cycle_log: "cycle_log".into(),
33 pending_event: "pending_event".into(),
34 }
35 }
36
37 pub fn create_ddl(&self) -> String {
39 format!(
40 "DEFINE TABLE IF NOT EXISTS {snap} SCHEMAFULL;\
41 DEFINE FIELD IF NOT EXISTS origin_id ON {snap} TYPE string;\
42 DEFINE FIELD IF NOT EXISTS query_id ON {snap} TYPE string;\
43 DEFINE FIELD IF NOT EXISTS row_key ON {snap} TYPE string;\
44 DEFINE FIELD IF NOT EXISTS row_data ON {snap} TYPE object FLEXIBLE;\
45 DEFINE FIELD IF NOT EXISTS row_hash ON {snap} TYPE string;\
46 DEFINE FIELD IF NOT EXISTS cycle_id ON {snap} TYPE int;\
47 DEFINE FIELD IF NOT EXISTS updated_at ON {snap} TYPE datetime DEFAULT time::now();\
48 DEFINE FIELD IF NOT EXISTS prev_hash ON {snap} TYPE option<string>;\
49 DEFINE INDEX IF NOT EXISTS idx_{snap}_key ON {snap} FIELDS origin_id, query_id, row_key UNIQUE;\
50 DEFINE INDEX IF NOT EXISTS idx_{snap}_cycle ON {snap} FIELDS origin_id, query_id, cycle_id;\
51 DEFINE TABLE IF NOT EXISTS {cl} SCHEMAFULL;\
52 DEFINE FIELD IF NOT EXISTS origin_id ON {cl} TYPE string;\
53 DEFINE FIELD IF NOT EXISTS query_id ON {cl} TYPE string;\
54 DEFINE FIELD IF NOT EXISTS cycle_id ON {cl} TYPE int;\
55 DEFINE FIELD IF NOT EXISTS started_at ON {cl} TYPE datetime;\
56 DEFINE FIELD IF NOT EXISTS finished_at ON {cl} TYPE option<datetime>;\
57 DEFINE FIELD IF NOT EXISTS status ON {cl} TYPE string DEFAULT 'running';\
58 DEFINE FIELD IF NOT EXISTS rows_fetched ON {cl} TYPE int DEFAULT 0;\
59 DEFINE FIELD IF NOT EXISTS rows_created ON {cl} TYPE int DEFAULT 0;\
60 DEFINE FIELD IF NOT EXISTS rows_updated ON {cl} TYPE int DEFAULT 0;\
61 DEFINE FIELD IF NOT EXISTS rows_deleted ON {cl} TYPE int DEFAULT 0;\
62 DEFINE INDEX IF NOT EXISTS idx_{cl}_source ON {cl} FIELDS origin_id, query_id, cycle_id UNIQUE;\
63 DEFINE TABLE IF NOT EXISTS {pe} SCHEMAFULL;\
64 DEFINE FIELD IF NOT EXISTS origin_id ON {pe} TYPE string;\
65 DEFINE FIELD IF NOT EXISTS query_id ON {pe} TYPE string;\
66 DEFINE FIELD IF NOT EXISTS cycle_id ON {pe} TYPE int;\
67 DEFINE FIELD IF NOT EXISTS events_json ON {pe} TYPE string;\
68 DEFINE FIELD IF NOT EXISTS created_at ON {pe} TYPE datetime DEFAULT time::now();\
69 DEFINE INDEX IF NOT EXISTS idx_{pe}_source ON {pe} FIELDS origin_id, query_id;",
70 snap = self.snapshot,
71 cl = self.cycle_log,
72 pe = self.pending_event,
73 )
74 }
75
76 pub fn resolve_sql(&self, template: &str) -> String {
78 template
79 .replace("{snapshot}", &self.snapshot)
80 .replace("{cycle_log}", &self.cycle_log)
81 .replace("{pending_event}", &self.pending_event)
82 }
83}
84
85fn sanitize_name(name: &str) -> String {
86 name.chars()
87 .map(|c| {
88 if c.is_ascii_alphanumeric() || c == '_' {
89 c.to_ascii_lowercase()
90 } else {
91 '_'
92 }
93 })
94 .collect()
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100
101 #[test]
102 fn for_source_basic() {
103 let t = TableNames::for_source("pg-prod");
104 assert_eq!(t.snapshot, "sync_pg_prod_snapshot");
105 assert_eq!(t.cycle_log, "sync_pg_prod_cycle_log");
106 assert_eq!(t.pending_event, "sync_pg_prod_pending");
107 }
108
109 #[test]
110 fn for_source_sanitizes_special_chars() {
111 let t = TableNames::for_source("my.trino/analytics");
112 assert_eq!(t.snapshot, "sync_my_trino_analytics_snapshot");
113 }
114
115 #[test]
116 fn for_source_lowercases() {
117 let t = TableNames::for_source("PG_Prod");
118 assert_eq!(t.snapshot, "sync_pg_prod_snapshot");
119 }
120
121 #[test]
122 fn default_shared_backward_compat() {
123 let t = TableNames::default_shared();
124 assert_eq!(t.snapshot, "snapshot");
125 assert_eq!(t.cycle_log, "cycle_log");
126 assert_eq!(t.pending_event, "pending_event");
127 }
128
129 #[test]
130 fn resolve_sql_replaces_all() {
131 let t = TableNames::for_source("pg");
132 let sql = "SELECT * FROM {snapshot} WHERE origin_id = $s; DELETE {pending_event}; UPDATE {cycle_log}";
133 let resolved = t.resolve_sql(sql);
134 assert!(resolved.contains("sync_pg_snapshot"));
135 assert!(resolved.contains("sync_pg_pending"));
136 assert!(resolved.contains("sync_pg_cycle_log"));
137 assert!(!resolved.contains("{snapshot}"));
138 }
139}