Skip to main content

rivet/state/
schema.rs

1use crate::error::Result;
2
3use super::{StateConn, StateStore, pg_sql};
4
5/// One column in a schema snapshot.
6#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
7pub struct SchemaColumn {
8    pub name: String,
9    #[serde(rename = "type")]
10    pub data_type: String,
11}
12
13/// Compute the canonical schema fingerprint for a set of columns.
14///
15/// The fingerprint is `"xxh3:<16-char-lowercase-hex>"`.  The algorithm prefix
16/// is part of the format so future hashers (sha256, blake3) can coexist —
17/// readers MUST verify the prefix before interpreting the hex body.
18///
19/// Canonicalization: columns are sorted by name (case-sensitive) and
20/// serialized as `<name>\t<data_type>\n` joined with no separator.  Column
21/// **order** in the source schema does not affect the fingerprint, but
22/// column **names** and **types** do; renaming or retyping a column changes
23/// the fingerprint.
24///
25/// This is the value written to the manifest's `schema_fingerprint` field
26/// (ADR-0012 M3) and is what `--validate` compares against to detect
27/// schema drift between the time of write and the time of verify.
28pub fn schema_fingerprint(columns: &[SchemaColumn]) -> String {
29    use xxhash_rust::xxh3::Xxh3;
30
31    let mut sorted: Vec<&SchemaColumn> = columns.iter().collect();
32    sorted.sort_by(|a, b| a.name.cmp(&b.name));
33
34    let mut h = Xxh3::new();
35    for c in &sorted {
36        h.update(c.name.as_bytes());
37        h.update(b"\t");
38        h.update(c.data_type.as_bytes());
39        h.update(b"\n");
40    }
41    format!("xxh3:{:016x}", h.digest())
42}
43
44/// Convert an Arrow schema (the dest-facing one, after internal columns are
45/// stripped — see `pipeline::sink`) into the `Vec<SchemaColumn>` the rest of
46/// the trust contract uses (`schema_fingerprint`, `store_schema`,
47/// `detect_schema_change`).
48///
49/// This is the canonical bridge between the Arrow type system and Rivet's
50/// schema-evidence representation.  It used to be inlined in three places
51/// (`pipeline/single.rs`, `pipeline/chunked/exec.rs`,
52/// `pipeline/chunked/parallel_checkpoint.rs`); each copy was a regression
53/// risk because changing the data-type representation in one would silently
54/// shift the fingerprint there but not in the others.  Centralising the
55/// conversion keeps the fingerprint stable across executors.
56///
57/// Format note: `data_type` is rendered with `{:?}` (Arrow's `Debug`), e.g.
58/// `Int64`, `Utf8`, `Timestamp(Microsecond, None)`.  This is what every
59/// existing manifest already records; switching to `Display` would shift
60/// every fingerprint in the world and is intentionally avoided.
61pub fn arrow_schema_to_columns(schema: &arrow::datatypes::Schema) -> Vec<SchemaColumn> {
62    schema
63        .fields()
64        .iter()
65        .map(|f| SchemaColumn {
66            name: f.name().clone(),
67            data_type: format!("{:?}", f.data_type()),
68        })
69        .collect()
70}
71
72/// Diff between two schema snapshots.
73#[derive(Debug)]
74pub struct SchemaChange {
75    pub added: Vec<String>,
76    pub removed: Vec<String>,
77    pub type_changed: Vec<(String, String, String)>, // (name, old_type, new_type)
78}
79
80impl SchemaChange {
81    pub fn is_empty(&self) -> bool {
82        self.added.is_empty() && self.removed.is_empty() && self.type_changed.is_empty()
83    }
84}
85
86/// Schema history store — reads and writes `export_schema`.
87///
88/// Captures a schema snapshot per export on each run and surfaces structural
89/// drift (added/removed/retyped columns) by diffing against the stored snapshot.
90impl StateStore {
91    pub fn get_stored_schema(&self, export_name: &str) -> Result<Option<Vec<SchemaColumn>>> {
92        match &self.conn {
93            StateConn::Sqlite(c) => {
94                let mut stmt =
95                    c.prepare("SELECT columns_json FROM export_schema WHERE export_name = ?1")?;
96                let result = stmt.query_row([export_name], |row| {
97                    let json_str: String = row.get(0)?;
98                    Ok(json_str)
99                });
100                match result {
101                    Ok(json_str) => {
102                        let cols: Vec<SchemaColumn> = serde_json::from_str(&json_str)?;
103                        Ok(Some(cols))
104                    }
105                    Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
106                    Err(e) => Err(e.into()),
107                }
108            }
109            StateConn::Postgres(client) => {
110                let mut c = client.borrow_mut();
111                match c.query_opt(
112                    "SELECT columns_json FROM export_schema WHERE export_name = $1",
113                    &[&export_name],
114                )? {
115                    Some(row) => {
116                        let json_str: String = row.get(0);
117                        let cols: Vec<SchemaColumn> = serde_json::from_str(&json_str)?;
118                        Ok(Some(cols))
119                    }
120                    None => Ok(None),
121                }
122            }
123        }
124    }
125
126    pub fn store_schema(&self, export_name: &str, columns: &[SchemaColumn]) -> Result<()> {
127        let json = serde_json::to_string(columns)?;
128        let now = chrono::Utc::now().to_rfc3339();
129        let sql = "INSERT INTO export_schema (export_name, columns_json, updated_at)
130             VALUES (?1, ?2, ?3)
131             ON CONFLICT(export_name) DO UPDATE SET
132                columns_json = excluded.columns_json,
133                updated_at = excluded.updated_at";
134        match &self.conn {
135            StateConn::Sqlite(c) => {
136                c.execute(sql, rusqlite::params![export_name, json, now])?;
137            }
138            StateConn::Postgres(client) => {
139                let mut c = client.borrow_mut();
140                c.execute(&pg_sql(sql), &[&export_name, &json, &now])?;
141            }
142        }
143        Ok(())
144    }
145
146    /// Detect structural drift versus the stored snapshot.
147    ///
148    /// On the first run (no stored snapshot) the current schema is stored and
149    /// `Ok(None)` is returned. On subsequent runs a diff is computed and returned
150    /// as `Ok(Some(change))` when columns differ — but the stored snapshot is
151    /// **not** updated automatically. Callers must call [`store_schema`] explicitly
152    /// after deciding whether to accept the change (policy `warn`/`continue`) or
153    /// reject it (policy `fail`, which intentionally leaves the old snapshot so the
154    /// next run detects the same change again).
155    pub fn detect_schema_change(
156        &self,
157        export_name: &str,
158        current: &[SchemaColumn],
159    ) -> Result<Option<SchemaChange>> {
160        let stored = match self.get_stored_schema(export_name)? {
161            Some(s) => s,
162            None => {
163                self.store_schema(export_name, current)?;
164                return Ok(None);
165            }
166        };
167
168        let stored_map: std::collections::HashMap<&str, &str> = stored
169            .iter()
170            .map(|c| (c.name.as_str(), c.data_type.as_str()))
171            .collect();
172        let current_map: std::collections::HashMap<&str, &str> = current
173            .iter()
174            .map(|c| (c.name.as_str(), c.data_type.as_str()))
175            .collect();
176
177        let added: Vec<String> = current
178            .iter()
179            .filter(|c| !stored_map.contains_key(c.name.as_str()))
180            .map(|c| format!("{} ({})", c.name, c.data_type))
181            .collect();
182
183        let removed: Vec<String> = stored
184            .iter()
185            .filter(|c| !current_map.contains_key(c.name.as_str()))
186            .map(|c| c.name.clone())
187            .collect();
188
189        let type_changed: Vec<(String, String, String)> = current
190            .iter()
191            .filter_map(|c| {
192                stored_map.get(c.name.as_str()).and_then(|old_type| {
193                    if *old_type != c.data_type.as_str() {
194                        Some((c.name.clone(), old_type.to_string(), c.data_type.clone()))
195                    } else {
196                        None
197                    }
198                })
199            })
200            .collect();
201
202        let change = SchemaChange {
203            added,
204            removed,
205            type_changed,
206        };
207
208        if change.is_empty() {
209            Ok(None)
210        } else {
211            Ok(Some(change))
212        }
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    fn store() -> StateStore {
221        StateStore::open_in_memory().expect("in-memory store")
222    }
223
224    #[test]
225    fn first_schema_stored_no_change() {
226        let s = store();
227        let cols = vec![
228            SchemaColumn {
229                name: "id".into(),
230                data_type: "Int64".into(),
231            },
232            SchemaColumn {
233                name: "name".into(),
234                data_type: "Utf8".into(),
235            },
236        ];
237        let change = s.detect_schema_change("orders", &cols).unwrap();
238        assert!(change.is_none(), "first run should detect no change");
239        assert!(s.get_stored_schema("orders").unwrap().is_some());
240    }
241
242    #[test]
243    fn same_schema_no_change() {
244        let s = store();
245        let cols = vec![SchemaColumn {
246            name: "id".into(),
247            data_type: "Int64".into(),
248        }];
249        s.detect_schema_change("t", &cols).unwrap();
250        let change = s.detect_schema_change("t", &cols).unwrap();
251        assert!(change.is_none());
252    }
253
254    #[test]
255    fn added_column_detected() {
256        let s = store();
257        let v1 = vec![SchemaColumn {
258            name: "id".into(),
259            data_type: "Int64".into(),
260        }];
261        s.detect_schema_change("t", &v1).unwrap();
262
263        let v2 = vec![
264            SchemaColumn {
265                name: "id".into(),
266                data_type: "Int64".into(),
267            },
268            SchemaColumn {
269                name: "email".into(),
270                data_type: "Utf8".into(),
271            },
272        ];
273        let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
274        assert_eq!(change.added.len(), 1);
275        assert!(change.added[0].contains("email"));
276    }
277
278    #[test]
279    fn removed_column_detected() {
280        let s = store();
281        let v1 = vec![
282            SchemaColumn {
283                name: "id".into(),
284                data_type: "Int64".into(),
285            },
286            SchemaColumn {
287                name: "old_field".into(),
288                data_type: "Utf8".into(),
289            },
290        ];
291        s.detect_schema_change("t", &v1).unwrap();
292
293        let v2 = vec![SchemaColumn {
294            name: "id".into(),
295            data_type: "Int64".into(),
296        }];
297        let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
298        assert_eq!(change.removed, vec!["old_field"]);
299    }
300
301    #[test]
302    fn type_change_detected() {
303        let s = store();
304        let v1 = vec![SchemaColumn {
305            name: "price".into(),
306            data_type: "Float64".into(),
307        }];
308        s.detect_schema_change("t", &v1).unwrap();
309
310        let v2 = vec![SchemaColumn {
311            name: "price".into(),
312            data_type: "Utf8".into(),
313        }];
314        let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
315        assert_eq!(change.type_changed.len(), 1);
316        assert_eq!(
317            change.type_changed[0],
318            ("price".into(), "Float64".into(), "Utf8".into())
319        );
320    }
321
322    #[test]
323    fn fail_policy_does_not_store_new_schema() {
324        let s = store();
325        let v1 = vec![SchemaColumn {
326            name: "id".into(),
327            data_type: "Int64".into(),
328        }];
329        s.detect_schema_change("t", &v1).unwrap();
330
331        let v2 = vec![
332            SchemaColumn {
333                name: "id".into(),
334                data_type: "Int64".into(),
335            },
336            SchemaColumn {
337                name: "new_col".into(),
338                data_type: "Utf8".into(),
339            },
340        ];
341        let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
342        assert_eq!(change.added.len(), 1);
343
344        let stored = s.get_stored_schema("t").unwrap().unwrap();
345        assert_eq!(stored.len(), 1);
346        assert_eq!(stored[0].name, "id");
347
348        let change2 = s.detect_schema_change("t", &v2).unwrap().unwrap();
349        assert_eq!(
350            change2.added.len(),
351            1,
352            "fail policy must re-detect on next run"
353        );
354    }
355
356    #[test]
357    fn warn_policy_stores_new_schema_after_change() {
358        let s = store();
359        let v1 = vec![SchemaColumn {
360            name: "id".into(),
361            data_type: "Int64".into(),
362        }];
363        s.detect_schema_change("t", &v1).unwrap();
364
365        let v2 = vec![
366            SchemaColumn {
367                name: "id".into(),
368                data_type: "Int64".into(),
369            },
370            SchemaColumn {
371                name: "extra".into(),
372                data_type: "Utf8".into(),
373            },
374        ];
375        let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
376        assert_eq!(change.added.len(), 1);
377
378        s.store_schema("t", &v2).unwrap();
379
380        let no_change = s.detect_schema_change("t", &v2).unwrap();
381        assert!(
382            no_change.is_none(),
383            "after store, same schema must produce no change"
384        );
385    }
386
387    // ── schema_fingerprint ───────────────────────────────────────────────────
388
389    fn col(name: &str, ty: &str) -> SchemaColumn {
390        SchemaColumn {
391            name: name.into(),
392            data_type: ty.into(),
393        }
394    }
395
396    #[test]
397    fn fingerprint_format_is_xxh3_prefix_plus_16_hex() {
398        let fp = schema_fingerprint(&[col("id", "Int64")]);
399        assert!(fp.starts_with("xxh3:"), "fp = {fp}");
400        let hex = &fp["xxh3:".len()..];
401        assert_eq!(hex.len(), 16, "fp = {fp}");
402        assert!(
403            hex.chars()
404                .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()),
405            "fp = {fp}"
406        );
407    }
408
409    #[test]
410    fn fingerprint_is_order_independent() {
411        let a = vec![col("id", "Int64"), col("name", "Utf8")];
412        let b = vec![col("name", "Utf8"), col("id", "Int64")];
413        assert_eq!(schema_fingerprint(&a), schema_fingerprint(&b));
414    }
415
416    #[test]
417    fn fingerprint_changes_on_rename() {
418        let a = vec![col("id", "Int64")];
419        let b = vec![col("user_id", "Int64")];
420        assert_ne!(schema_fingerprint(&a), schema_fingerprint(&b));
421    }
422
423    #[test]
424    fn fingerprint_changes_on_retype() {
425        let a = vec![col("price", "Int64")];
426        let b = vec![col("price", "Float64")];
427        assert_ne!(schema_fingerprint(&a), schema_fingerprint(&b));
428    }
429
430    #[test]
431    fn fingerprint_changes_on_column_add_or_remove() {
432        let a = vec![col("id", "Int64")];
433        let b = vec![col("id", "Int64"), col("email", "Utf8")];
434        assert_ne!(schema_fingerprint(&a), schema_fingerprint(&b));
435    }
436
437    #[test]
438    fn fingerprint_is_stable_across_invocations() {
439        // Guards against accidental non-determinism (HashMap iteration order,
440        // process-local randomness in a future xxh3 update, etc).  This is
441        // the value written to manifests — it MUST be reproducible.
442        let cols = vec![col("a", "Int64"), col("b", "Utf8"), col("c", "Float64")];
443        let fp1 = schema_fingerprint(&cols);
444        let fp2 = schema_fingerprint(&cols);
445        let fp3 = schema_fingerprint(&cols);
446        assert_eq!(fp1, fp2);
447        assert_eq!(fp2, fp3);
448    }
449
450    #[test]
451    fn fingerprint_distinguishes_split_columns() {
452        // Defends against a naive concat-without-separator implementation:
453        // "ab" + "c" must hash differently from "a" + "bc".
454        let a = vec![col("ab", "Int64"), col("c", "Utf8")];
455        let b = vec![col("a", "Int64"), col("bc", "Utf8")];
456        assert_ne!(schema_fingerprint(&a), schema_fingerprint(&b));
457    }
458
459    #[test]
460    fn fingerprint_empty_input_is_well_defined() {
461        // Empty schema (no columns) must produce a deterministic value
462        // rather than panicking; the manifest writer may need to emit a
463        // placeholder fingerprint for degenerate plans.
464        let fp = schema_fingerprint(&[]);
465        assert!(fp.starts_with("xxh3:"));
466    }
467}