1use crate::error::Result;
2
3use super::{StateConn, StateStore, pg_sql};
4
5#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
7pub struct SchemaColumn {
8 pub name: String,
9 #[serde(rename = "type")]
10 pub data_type: String,
11}
12
13pub fn schema_fingerprint(columns: &[SchemaColumn]) -> String {
29 use xxhash_rust::xxh3::Xxh3;
30
31 let mut sorted: Vec<&SchemaColumn> = columns.iter().collect();
32 sorted.sort_by(|a, b| a.name.cmp(&b.name));
33
34 let mut h = Xxh3::new();
35 for c in &sorted {
36 h.update(c.name.as_bytes());
37 h.update(b"\t");
38 h.update(c.data_type.as_bytes());
39 h.update(b"\n");
40 }
41 format!("xxh3:{:016x}", h.digest())
42}
43
44pub fn arrow_schema_to_columns(schema: &arrow::datatypes::Schema) -> Vec<SchemaColumn> {
62 schema
63 .fields()
64 .iter()
65 .map(|f| SchemaColumn {
66 name: f.name().clone(),
67 data_type: format!("{:?}", f.data_type()),
68 })
69 .collect()
70}
71
72#[derive(Debug)]
74pub struct SchemaChange {
75 pub added: Vec<String>,
76 pub removed: Vec<String>,
77 pub type_changed: Vec<(String, String, String)>, }
79
80impl SchemaChange {
81 pub fn is_empty(&self) -> bool {
82 self.added.is_empty() && self.removed.is_empty() && self.type_changed.is_empty()
83 }
84}
85
86impl StateStore {
91 pub fn get_stored_schema(&self, export_name: &str) -> Result<Option<Vec<SchemaColumn>>> {
92 match &self.conn {
93 StateConn::Sqlite(c) => {
94 let mut stmt =
95 c.prepare("SELECT columns_json FROM export_schema WHERE export_name = ?1")?;
96 let result = stmt.query_row([export_name], |row| {
97 let json_str: String = row.get(0)?;
98 Ok(json_str)
99 });
100 match result {
101 Ok(json_str) => {
102 let cols: Vec<SchemaColumn> = serde_json::from_str(&json_str)?;
103 Ok(Some(cols))
104 }
105 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
106 Err(e) => Err(e.into()),
107 }
108 }
109 StateConn::Postgres(client) => {
110 let mut c = client.borrow_mut();
111 match c.query_opt(
112 "SELECT columns_json FROM export_schema WHERE export_name = $1",
113 &[&export_name],
114 )? {
115 Some(row) => {
116 let json_str: String = row.get(0);
117 let cols: Vec<SchemaColumn> = serde_json::from_str(&json_str)?;
118 Ok(Some(cols))
119 }
120 None => Ok(None),
121 }
122 }
123 }
124 }
125
126 pub fn store_schema(&self, export_name: &str, columns: &[SchemaColumn]) -> Result<()> {
127 let json = serde_json::to_string(columns)?;
128 let now = chrono::Utc::now().to_rfc3339();
129 let sql = "INSERT INTO export_schema (export_name, columns_json, updated_at)
130 VALUES (?1, ?2, ?3)
131 ON CONFLICT(export_name) DO UPDATE SET
132 columns_json = excluded.columns_json,
133 updated_at = excluded.updated_at";
134 match &self.conn {
135 StateConn::Sqlite(c) => {
136 c.execute(sql, rusqlite::params![export_name, json, now])?;
137 }
138 StateConn::Postgres(client) => {
139 let mut c = client.borrow_mut();
140 c.execute(&pg_sql(sql), &[&export_name, &json, &now])?;
141 }
142 }
143 Ok(())
144 }
145
146 pub fn detect_schema_change(
156 &self,
157 export_name: &str,
158 current: &[SchemaColumn],
159 ) -> Result<Option<SchemaChange>> {
160 let stored = match self.get_stored_schema(export_name)? {
161 Some(s) => s,
162 None => {
163 self.store_schema(export_name, current)?;
164 return Ok(None);
165 }
166 };
167
168 let stored_map: std::collections::HashMap<&str, &str> = stored
169 .iter()
170 .map(|c| (c.name.as_str(), c.data_type.as_str()))
171 .collect();
172 let current_map: std::collections::HashMap<&str, &str> = current
173 .iter()
174 .map(|c| (c.name.as_str(), c.data_type.as_str()))
175 .collect();
176
177 let added: Vec<String> = current
178 .iter()
179 .filter(|c| !stored_map.contains_key(c.name.as_str()))
180 .map(|c| format!("{} ({})", c.name, c.data_type))
181 .collect();
182
183 let removed: Vec<String> = stored
184 .iter()
185 .filter(|c| !current_map.contains_key(c.name.as_str()))
186 .map(|c| c.name.clone())
187 .collect();
188
189 let type_changed: Vec<(String, String, String)> = current
190 .iter()
191 .filter_map(|c| {
192 stored_map.get(c.name.as_str()).and_then(|old_type| {
193 if *old_type != c.data_type.as_str() {
194 Some((c.name.clone(), old_type.to_string(), c.data_type.clone()))
195 } else {
196 None
197 }
198 })
199 })
200 .collect();
201
202 let change = SchemaChange {
203 added,
204 removed,
205 type_changed,
206 };
207
208 if change.is_empty() {
209 Ok(None)
210 } else {
211 Ok(Some(change))
212 }
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 fn store() -> StateStore {
221 StateStore::open_in_memory().expect("in-memory store")
222 }
223
224 #[test]
225 fn first_schema_stored_no_change() {
226 let s = store();
227 let cols = vec![
228 SchemaColumn {
229 name: "id".into(),
230 data_type: "Int64".into(),
231 },
232 SchemaColumn {
233 name: "name".into(),
234 data_type: "Utf8".into(),
235 },
236 ];
237 let change = s.detect_schema_change("orders", &cols).unwrap();
238 assert!(change.is_none(), "first run should detect no change");
239 assert!(s.get_stored_schema("orders").unwrap().is_some());
240 }
241
242 #[test]
243 fn same_schema_no_change() {
244 let s = store();
245 let cols = vec![SchemaColumn {
246 name: "id".into(),
247 data_type: "Int64".into(),
248 }];
249 s.detect_schema_change("t", &cols).unwrap();
250 let change = s.detect_schema_change("t", &cols).unwrap();
251 assert!(change.is_none());
252 }
253
254 #[test]
255 fn added_column_detected() {
256 let s = store();
257 let v1 = vec![SchemaColumn {
258 name: "id".into(),
259 data_type: "Int64".into(),
260 }];
261 s.detect_schema_change("t", &v1).unwrap();
262
263 let v2 = vec![
264 SchemaColumn {
265 name: "id".into(),
266 data_type: "Int64".into(),
267 },
268 SchemaColumn {
269 name: "email".into(),
270 data_type: "Utf8".into(),
271 },
272 ];
273 let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
274 assert_eq!(change.added.len(), 1);
275 assert!(change.added[0].contains("email"));
276 }
277
278 #[test]
279 fn removed_column_detected() {
280 let s = store();
281 let v1 = vec![
282 SchemaColumn {
283 name: "id".into(),
284 data_type: "Int64".into(),
285 },
286 SchemaColumn {
287 name: "old_field".into(),
288 data_type: "Utf8".into(),
289 },
290 ];
291 s.detect_schema_change("t", &v1).unwrap();
292
293 let v2 = vec![SchemaColumn {
294 name: "id".into(),
295 data_type: "Int64".into(),
296 }];
297 let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
298 assert_eq!(change.removed, vec!["old_field"]);
299 }
300
301 #[test]
302 fn type_change_detected() {
303 let s = store();
304 let v1 = vec![SchemaColumn {
305 name: "price".into(),
306 data_type: "Float64".into(),
307 }];
308 s.detect_schema_change("t", &v1).unwrap();
309
310 let v2 = vec![SchemaColumn {
311 name: "price".into(),
312 data_type: "Utf8".into(),
313 }];
314 let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
315 assert_eq!(change.type_changed.len(), 1);
316 assert_eq!(
317 change.type_changed[0],
318 ("price".into(), "Float64".into(), "Utf8".into())
319 );
320 }
321
322 #[test]
323 fn fail_policy_does_not_store_new_schema() {
324 let s = store();
325 let v1 = vec![SchemaColumn {
326 name: "id".into(),
327 data_type: "Int64".into(),
328 }];
329 s.detect_schema_change("t", &v1).unwrap();
330
331 let v2 = vec![
332 SchemaColumn {
333 name: "id".into(),
334 data_type: "Int64".into(),
335 },
336 SchemaColumn {
337 name: "new_col".into(),
338 data_type: "Utf8".into(),
339 },
340 ];
341 let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
342 assert_eq!(change.added.len(), 1);
343
344 let stored = s.get_stored_schema("t").unwrap().unwrap();
345 assert_eq!(stored.len(), 1);
346 assert_eq!(stored[0].name, "id");
347
348 let change2 = s.detect_schema_change("t", &v2).unwrap().unwrap();
349 assert_eq!(
350 change2.added.len(),
351 1,
352 "fail policy must re-detect on next run"
353 );
354 }
355
356 #[test]
357 fn warn_policy_stores_new_schema_after_change() {
358 let s = store();
359 let v1 = vec![SchemaColumn {
360 name: "id".into(),
361 data_type: "Int64".into(),
362 }];
363 s.detect_schema_change("t", &v1).unwrap();
364
365 let v2 = vec![
366 SchemaColumn {
367 name: "id".into(),
368 data_type: "Int64".into(),
369 },
370 SchemaColumn {
371 name: "extra".into(),
372 data_type: "Utf8".into(),
373 },
374 ];
375 let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
376 assert_eq!(change.added.len(), 1);
377
378 s.store_schema("t", &v2).unwrap();
379
380 let no_change = s.detect_schema_change("t", &v2).unwrap();
381 assert!(
382 no_change.is_none(),
383 "after store, same schema must produce no change"
384 );
385 }
386
387 fn col(name: &str, ty: &str) -> SchemaColumn {
390 SchemaColumn {
391 name: name.into(),
392 data_type: ty.into(),
393 }
394 }
395
396 #[test]
397 fn fingerprint_format_is_xxh3_prefix_plus_16_hex() {
398 let fp = schema_fingerprint(&[col("id", "Int64")]);
399 assert!(fp.starts_with("xxh3:"), "fp = {fp}");
400 let hex = &fp["xxh3:".len()..];
401 assert_eq!(hex.len(), 16, "fp = {fp}");
402 assert!(
403 hex.chars()
404 .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()),
405 "fp = {fp}"
406 );
407 }
408
409 #[test]
410 fn fingerprint_is_order_independent() {
411 let a = vec![col("id", "Int64"), col("name", "Utf8")];
412 let b = vec![col("name", "Utf8"), col("id", "Int64")];
413 assert_eq!(schema_fingerprint(&a), schema_fingerprint(&b));
414 }
415
416 #[test]
417 fn fingerprint_changes_on_rename() {
418 let a = vec![col("id", "Int64")];
419 let b = vec![col("user_id", "Int64")];
420 assert_ne!(schema_fingerprint(&a), schema_fingerprint(&b));
421 }
422
423 #[test]
424 fn fingerprint_changes_on_retype() {
425 let a = vec![col("price", "Int64")];
426 let b = vec![col("price", "Float64")];
427 assert_ne!(schema_fingerprint(&a), schema_fingerprint(&b));
428 }
429
430 #[test]
431 fn fingerprint_changes_on_column_add_or_remove() {
432 let a = vec![col("id", "Int64")];
433 let b = vec![col("id", "Int64"), col("email", "Utf8")];
434 assert_ne!(schema_fingerprint(&a), schema_fingerprint(&b));
435 }
436
437 #[test]
438 fn fingerprint_is_stable_across_invocations() {
439 let cols = vec![col("a", "Int64"), col("b", "Utf8"), col("c", "Float64")];
443 let fp1 = schema_fingerprint(&cols);
444 let fp2 = schema_fingerprint(&cols);
445 let fp3 = schema_fingerprint(&cols);
446 assert_eq!(fp1, fp2);
447 assert_eq!(fp2, fp3);
448 }
449
450 #[test]
451 fn fingerprint_distinguishes_split_columns() {
452 let a = vec![col("ab", "Int64"), col("c", "Utf8")];
455 let b = vec![col("a", "Int64"), col("bc", "Utf8")];
456 assert_ne!(schema_fingerprint(&a), schema_fingerprint(&b));
457 }
458
459 #[test]
460 fn fingerprint_empty_input_is_well_defined() {
461 let fp = schema_fingerprint(&[]);
465 assert!(fp.starts_with("xxh3:"));
466 }
467}