Skip to main content

oversync_core/
table_names.rs

1//! Per-pipeline SurrealDB table names.
2//!
3//! Each sync pipeline gets its own set of tables for isolation,
4//! named by a deterministic pattern: `sync_{source}_{type}`.
5
6const MAX_SOURCE_IDENTIFIER_LEN: usize = 48;
7const TRUNCATED_HASH_LEN: usize = 12;
8
9/// Table names for a sync pipeline's internal state.
10#[derive(Debug, Clone)]
11pub struct TableNames {
12	pub snapshot: String,
13	pub cycle_log: String,
14	pub pending_event: String,
15}
16
17impl TableNames {
18	/// Build table names from a source name.
19	///
20	/// Sanitizes the source name to a valid SurrealDB identifier
21	/// (lowercase alphanumeric + underscore).
22	pub fn for_source(source_name: &str) -> Self {
23		let safe = sanitize_name(source_name);
24		Self {
25			snapshot: format!("sync_{safe}_snapshot"),
26			cycle_log: format!("sync_{safe}_cycle_log"),
27			pending_event: format!("sync_{safe}_pending"),
28		}
29	}
30
31	/// Default table names (backward compatible with existing single-pipeline deployments).
32	pub fn default_shared() -> Self {
33		Self {
34			snapshot: "snapshot".into(),
35			cycle_log: "cycle_log".into(),
36			pending_event: "pending_event".into(),
37		}
38	}
39
40	/// Generate SurrealQL DDL to create these tables (idempotent).
41	pub fn create_ddl(&self) -> String {
42		format!(
43			"DEFINE TABLE IF NOT EXISTS {snap} SCHEMAFULL;\
44			 DEFINE FIELD IF NOT EXISTS origin_id  ON {snap} TYPE string;\
45			 DEFINE FIELD IF NOT EXISTS query_id   ON {snap} TYPE string;\
46			 DEFINE FIELD IF NOT EXISTS row_key    ON {snap} TYPE string;\
47			 DEFINE FIELD IF NOT EXISTS row_data   ON {snap} TYPE object FLEXIBLE;\
48			 DEFINE FIELD IF NOT EXISTS row_hash   ON {snap} TYPE string;\
49			 DEFINE FIELD IF NOT EXISTS cycle_id   ON {snap} TYPE int;\
50			 DEFINE FIELD IF NOT EXISTS updated_at ON {snap} TYPE datetime DEFAULT time::now();\
51			 DEFINE FIELD IF NOT EXISTS prev_hash  ON {snap} TYPE option<string>;\
52			 DEFINE INDEX IF NOT EXISTS idx_{snap}_key   ON {snap} FIELDS origin_id, query_id, row_key UNIQUE;\
53			 DEFINE INDEX IF NOT EXISTS idx_{snap}_cycle ON {snap} FIELDS origin_id, query_id, cycle_id;\
54			 DEFINE TABLE IF NOT EXISTS {cl} SCHEMAFULL;\
55			 DEFINE FIELD IF NOT EXISTS origin_id    ON {cl} TYPE string;\
56			 DEFINE FIELD IF NOT EXISTS query_id     ON {cl} TYPE string;\
57			 DEFINE FIELD IF NOT EXISTS cycle_id     ON {cl} TYPE int;\
58			 DEFINE FIELD IF NOT EXISTS started_at   ON {cl} TYPE datetime;\
59			 DEFINE FIELD IF NOT EXISTS finished_at  ON {cl} TYPE option<datetime>;\
60			 DEFINE FIELD IF NOT EXISTS status       ON {cl} TYPE string DEFAULT 'running';\
61			 DEFINE FIELD IF NOT EXISTS rows_fetched ON {cl} TYPE int DEFAULT 0;\
62			 DEFINE FIELD IF NOT EXISTS rows_created ON {cl} TYPE int DEFAULT 0;\
63			 DEFINE FIELD IF NOT EXISTS rows_updated ON {cl} TYPE int DEFAULT 0;\
64			 DEFINE FIELD IF NOT EXISTS rows_deleted ON {cl} TYPE int DEFAULT 0;\
65			 DEFINE INDEX IF NOT EXISTS idx_{cl}_source ON {cl} FIELDS origin_id, query_id, cycle_id UNIQUE;\
66			 DEFINE TABLE IF NOT EXISTS {pe} SCHEMAFULL;\
67			 DEFINE FIELD IF NOT EXISTS origin_id   ON {pe} TYPE string;\
68			 DEFINE FIELD IF NOT EXISTS query_id    ON {pe} TYPE string;\
69			 DEFINE FIELD IF NOT EXISTS cycle_id    ON {pe} TYPE int;\
70			 DEFINE FIELD IF NOT EXISTS events_json ON {pe} TYPE string;\
71			 DEFINE FIELD IF NOT EXISTS created_at  ON {pe} TYPE datetime DEFAULT time::now();\
72			 DEFINE INDEX IF NOT EXISTS idx_{pe}_source ON {pe} FIELDS origin_id, query_id;",
73			snap = self.snapshot,
74			cl = self.cycle_log,
75			pe = self.pending_event,
76		)
77	}
78
79	/// Replace `{snapshot}`, `{cycle_log}`, `{pending_event}` placeholders in SQL.
80	pub fn resolve_sql(&self, template: &str) -> String {
81		template
82			.replace("{snapshot}", &self.snapshot)
83			.replace("{cycle_log}", &self.cycle_log)
84			.replace("{pending_event}", &self.pending_event)
85	}
86}
87
88fn sanitize_name(name: &str) -> String {
89	let mut sanitized = String::with_capacity(name.len());
90	let mut prev_was_underscore = false;
91
92	for c in name.chars() {
93		let mapped = if c.is_ascii_alphanumeric() || c == '_' {
94			c.to_ascii_lowercase()
95		} else {
96			'_'
97		};
98
99		if mapped == '_' {
100			if !prev_was_underscore {
101				sanitized.push('_');
102			}
103			prev_was_underscore = true;
104		} else {
105			sanitized.push(mapped);
106			prev_was_underscore = false;
107		}
108	}
109
110	let sanitized = sanitized.trim_matches('_');
111	if sanitized.is_empty() {
112		return "unnamed".to_string();
113	}
114
115	if sanitized.len() <= MAX_SOURCE_IDENTIFIER_LEN {
116		return sanitized.to_string();
117	}
118
119	let hash = short_hash(sanitized);
120	let prefix_len = MAX_SOURCE_IDENTIFIER_LEN - TRUNCATED_HASH_LEN - 1;
121	let prefix = &sanitized[..prefix_len];
122	format!("{prefix}_{hash}")
123}
124
125fn short_hash(name: &str) -> String {
126	use sha2::{Digest, Sha256};
127
128	let digest = Sha256::digest(name.as_bytes());
129	const_hex::encode(&digest[..TRUNCATED_HASH_LEN / 2])
130}
131
132#[cfg(test)]
133mod tests {
134	use super::*;
135
136	#[test]
137	fn for_source_basic() {
138		let t = TableNames::for_source("pg-prod");
139		assert_eq!(t.snapshot, "sync_pg_prod_snapshot");
140		assert_eq!(t.cycle_log, "sync_pg_prod_cycle_log");
141		assert_eq!(t.pending_event, "sync_pg_prod_pending");
142	}
143
144	#[test]
145	fn for_source_sanitizes_special_chars() {
146		let t = TableNames::for_source("my.trino/analytics");
147		assert_eq!(t.snapshot, "sync_my_trino_analytics_snapshot");
148	}
149
150	#[test]
151	fn for_source_collapses_repeated_underscores() {
152		let t = TableNames::for_source("my...trino///analytics");
153		assert_eq!(t.snapshot, "sync_my_trino_analytics_snapshot");
154	}
155
156	#[test]
157	fn for_source_lowercases() {
158		let t = TableNames::for_source("PG_Prod");
159		assert_eq!(t.snapshot, "sync_pg_prod_snapshot");
160	}
161
162	#[test]
163	fn for_source_uses_fallback_for_empty_name() {
164		let t = TableNames::for_source("!!!");
165		assert_eq!(t.snapshot, "sync_unnamed_snapshot");
166	}
167
168	#[test]
169	fn for_source_truncates_with_hash_suffix() {
170		let long = "a".repeat(200);
171		let t = TableNames::for_source(&long);
172		assert!(
173			t.snapshot
174				.starts_with("sync_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa_")
175		);
176		assert!(t.snapshot.ends_with("_snapshot"));
177		assert!(t.snapshot.len() < 80);
178	}
179
180	#[test]
181	fn for_source_long_names_remain_distinct() {
182		let a = format!("{}-one", "x".repeat(120));
183		let b = format!("{}-two", "x".repeat(120));
184		let a_names = TableNames::for_source(&a);
185		let b_names = TableNames::for_source(&b);
186		assert_ne!(a_names.snapshot, b_names.snapshot);
187	}
188
189	#[test]
190	fn default_shared_backward_compat() {
191		let t = TableNames::default_shared();
192		assert_eq!(t.snapshot, "snapshot");
193		assert_eq!(t.cycle_log, "cycle_log");
194		assert_eq!(t.pending_event, "pending_event");
195	}
196
197	#[test]
198	fn resolve_sql_replaces_all() {
199		let t = TableNames::for_source("pg");
200		let sql = "SELECT * FROM {snapshot} WHERE origin_id = $s; DELETE {pending_event}; UPDATE {cycle_log}";
201		let resolved = t.resolve_sql(sql);
202		assert!(resolved.contains("sync_pg_snapshot"));
203		assert!(resolved.contains("sync_pg_pending"));
204		assert!(resolved.contains("sync_pg_cycle_log"));
205		assert!(!resolved.contains("{snapshot}"));
206	}
207}