Skip to main content

oversync_core/
table_names.rs

1//! Per-pipeline SurrealDB table names.
2//!
3//! Each sync pipeline gets its own set of tables for isolation,
4//! named by a deterministic pattern: `sync_{source}_{type}`.
5
6/// Table names for a sync pipeline's internal state.
7#[derive(Debug, Clone)]
8pub struct TableNames {
9	pub snapshot: String,
10	pub cycle_log: String,
11	pub pending_event: String,
12}
13
14impl TableNames {
15	/// Build table names from a source name.
16	///
17	/// Sanitizes the source name to a valid SurrealDB identifier
18	/// (lowercase alphanumeric + underscore).
19	pub fn for_source(source_name: &str) -> Self {
20		let safe = sanitize_name(source_name);
21		Self {
22			snapshot: format!("sync_{safe}_snapshot"),
23			cycle_log: format!("sync_{safe}_cycle_log"),
24			pending_event: format!("sync_{safe}_pending"),
25		}
26	}
27
28	/// Default table names (backward compatible with existing single-pipeline deployments).
29	pub fn default_shared() -> Self {
30		Self {
31			snapshot: "snapshot".into(),
32			cycle_log: "cycle_log".into(),
33			pending_event: "pending_event".into(),
34		}
35	}
36
37	/// Generate SurrealQL DDL to create these tables (idempotent).
38	pub fn create_ddl(&self) -> String {
39		format!(
40			"DEFINE TABLE IF NOT EXISTS {snap} SCHEMAFULL;\
41			 DEFINE FIELD IF NOT EXISTS origin_id  ON {snap} TYPE string;\
42			 DEFINE FIELD IF NOT EXISTS query_id   ON {snap} TYPE string;\
43			 DEFINE FIELD IF NOT EXISTS row_key    ON {snap} TYPE string;\
44			 DEFINE FIELD IF NOT EXISTS row_data   ON {snap} TYPE object FLEXIBLE;\
45			 DEFINE FIELD IF NOT EXISTS row_hash   ON {snap} TYPE string;\
46			 DEFINE FIELD IF NOT EXISTS cycle_id   ON {snap} TYPE int;\
47			 DEFINE FIELD IF NOT EXISTS updated_at ON {snap} TYPE datetime DEFAULT time::now();\
48			 DEFINE FIELD IF NOT EXISTS prev_hash  ON {snap} TYPE option<string>;\
49			 DEFINE INDEX IF NOT EXISTS idx_{snap}_key   ON {snap} FIELDS origin_id, query_id, row_key UNIQUE;\
50			 DEFINE INDEX IF NOT EXISTS idx_{snap}_cycle ON {snap} FIELDS origin_id, query_id, cycle_id;\
51			 DEFINE TABLE IF NOT EXISTS {cl} SCHEMAFULL;\
52			 DEFINE FIELD IF NOT EXISTS origin_id    ON {cl} TYPE string;\
53			 DEFINE FIELD IF NOT EXISTS query_id     ON {cl} TYPE string;\
54			 DEFINE FIELD IF NOT EXISTS cycle_id     ON {cl} TYPE int;\
55			 DEFINE FIELD IF NOT EXISTS started_at   ON {cl} TYPE datetime;\
56			 DEFINE FIELD IF NOT EXISTS finished_at  ON {cl} TYPE option<datetime>;\
57			 DEFINE FIELD IF NOT EXISTS status       ON {cl} TYPE string DEFAULT 'running';\
58			 DEFINE FIELD IF NOT EXISTS rows_fetched ON {cl} TYPE int DEFAULT 0;\
59			 DEFINE FIELD IF NOT EXISTS rows_created ON {cl} TYPE int DEFAULT 0;\
60			 DEFINE FIELD IF NOT EXISTS rows_updated ON {cl} TYPE int DEFAULT 0;\
61			 DEFINE FIELD IF NOT EXISTS rows_deleted ON {cl} TYPE int DEFAULT 0;\
62			 DEFINE INDEX IF NOT EXISTS idx_{cl}_source ON {cl} FIELDS origin_id, query_id, cycle_id UNIQUE;\
63			 DEFINE TABLE IF NOT EXISTS {pe} SCHEMAFULL;\
64			 DEFINE FIELD IF NOT EXISTS origin_id   ON {pe} TYPE string;\
65			 DEFINE FIELD IF NOT EXISTS query_id    ON {pe} TYPE string;\
66			 DEFINE FIELD IF NOT EXISTS cycle_id    ON {pe} TYPE int;\
67			 DEFINE FIELD IF NOT EXISTS events_json ON {pe} TYPE string;\
68			 DEFINE FIELD IF NOT EXISTS created_at  ON {pe} TYPE datetime DEFAULT time::now();\
69			 DEFINE INDEX IF NOT EXISTS idx_{pe}_source ON {pe} FIELDS origin_id, query_id;",
70			snap = self.snapshot,
71			cl = self.cycle_log,
72			pe = self.pending_event,
73		)
74	}
75
76	/// Replace `{snapshot}`, `{cycle_log}`, `{pending_event}` placeholders in SQL.
77	pub fn resolve_sql(&self, template: &str) -> String {
78		template
79			.replace("{snapshot}", &self.snapshot)
80			.replace("{cycle_log}", &self.cycle_log)
81			.replace("{pending_event}", &self.pending_event)
82	}
83}
84
85fn sanitize_name(name: &str) -> String {
86	name.chars()
87		.map(|c| {
88			if c.is_ascii_alphanumeric() || c == '_' {
89				c.to_ascii_lowercase()
90			} else {
91				'_'
92			}
93		})
94		.collect()
95}
96
97#[cfg(test)]
98mod tests {
99	use super::*;
100
101	#[test]
102	fn for_source_basic() {
103		let t = TableNames::for_source("pg-prod");
104		assert_eq!(t.snapshot, "sync_pg_prod_snapshot");
105		assert_eq!(t.cycle_log, "sync_pg_prod_cycle_log");
106		assert_eq!(t.pending_event, "sync_pg_prod_pending");
107	}
108
109	#[test]
110	fn for_source_sanitizes_special_chars() {
111		let t = TableNames::for_source("my.trino/analytics");
112		assert_eq!(t.snapshot, "sync_my_trino_analytics_snapshot");
113	}
114
115	#[test]
116	fn for_source_lowercases() {
117		let t = TableNames::for_source("PG_Prod");
118		assert_eq!(t.snapshot, "sync_pg_prod_snapshot");
119	}
120
121	#[test]
122	fn default_shared_backward_compat() {
123		let t = TableNames::default_shared();
124		assert_eq!(t.snapshot, "snapshot");
125		assert_eq!(t.cycle_log, "cycle_log");
126		assert_eq!(t.pending_event, "pending_event");
127	}
128
129	#[test]
130	fn resolve_sql_replaces_all() {
131		let t = TableNames::for_source("pg");
132		let sql = "SELECT * FROM {snapshot} WHERE origin_id = $s; DELETE {pending_event}; UPDATE {cycle_log}";
133		let resolved = t.resolve_sql(sql);
134		assert!(resolved.contains("sync_pg_snapshot"));
135		assert!(resolved.contains("sync_pg_pending"));
136		assert!(resolved.contains("sync_pg_cycle_log"));
137		assert!(!resolved.contains("{snapshot}"));
138	}
139}