Skip to main content

rivet_cli/
enrich.rs

1use std::sync::Arc;
2
3use arrow::array::{ArrayRef, Int64Array, TimestampMicrosecondArray};
4use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
5use arrow::record_batch::RecordBatch;
6
7use crate::config::MetaColumns;
8use crate::error::Result;
9
10pub const COL_EXPORTED_AT: &str = "_rivet_exported_at";
11pub const COL_ROW_HASH: &str = "_rivet_row_hash";
12
13/// Extend an Arrow schema with requested meta columns.
14pub fn enrich_schema(schema: &SchemaRef, meta: &MetaColumns) -> SchemaRef {
15    if !meta.exported_at && !meta.row_hash {
16        return schema.clone();
17    }
18    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
19    if meta.exported_at {
20        fields.push(Arc::new(Field::new(
21            COL_EXPORTED_AT,
22            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
23            false,
24        )));
25    }
26    if meta.row_hash {
27        fields.push(Arc::new(Field::new(COL_ROW_HASH, DataType::Int64, false)));
28    }
29    Arc::new(Schema::new(fields))
30}
31
32/// Add meta columns to a RecordBatch.
33/// `exported_at_us` is a single microsecond-precision UTC timestamp shared by all rows.
34pub fn enrich_batch(
35    batch: &RecordBatch,
36    meta: &MetaColumns,
37    enriched_schema: &SchemaRef,
38    exported_at_us: i64,
39) -> Result<RecordBatch> {
40    if !meta.exported_at && !meta.row_hash {
41        return Ok(batch.clone());
42    }
43
44    let n = batch.num_rows();
45    let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
46
47    if meta.exported_at {
48        let ts_array =
49            TimestampMicrosecondArray::from(vec![Some(exported_at_us); n]).with_timezone("UTC");
50        columns.push(Arc::new(ts_array));
51    }
52
53    if meta.row_hash {
54        let mut hashes = Vec::with_capacity(n);
55        for row in 0..n {
56            hashes.push(hash_row(batch, row));
57        }
58        columns.push(Arc::new(Int64Array::from(hashes)));
59    }
60
61    Ok(RecordBatch::try_new(enriched_schema.clone(), columns)?)
62}
63
64/// Compute a deterministic 64-bit hash of all column values in a row.
65/// Uses the lower 64 bits of xxHash3-128 reinterpreted as signed i64.
66fn hash_row(batch: &RecordBatch, row: usize) -> i64 {
67    use xxhash_rust::xxh3::xxh3_128;
68
69    let mut buf = Vec::with_capacity(256);
70    for col_idx in 0..batch.num_columns() {
71        let array = batch.column(col_idx);
72        if array.is_null(row) {
73            buf.extend_from_slice(b"\x00");
74        } else {
75            let s = arrow::util::display::array_value_to_string(array, row).unwrap_or_default();
76            buf.extend_from_slice(s.as_bytes());
77        }
78        buf.push(b'\x1f'); // unit separator between fields
79    }
80    let h = xxh3_128(&buf);
81    h as i64 // lower 64 bits, reinterpreted as signed
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87    use arrow::array::StringArray;
88    use arrow::datatypes::Field;
89
90    fn sample_batch() -> (SchemaRef, RecordBatch) {
91        let schema = Arc::new(Schema::new(vec![
92            Field::new("id", DataType::Int64, false),
93            Field::new("name", DataType::Utf8, true),
94        ]));
95        let batch = RecordBatch::try_new(
96            schema.clone(),
97            vec![
98                Arc::new(Int64Array::from(vec![1, 2, 3])),
99                Arc::new(StringArray::from(vec![
100                    Some("alice"),
101                    None,
102                    Some("charlie"),
103                ])),
104            ],
105        )
106        .unwrap();
107        (schema, batch)
108    }
109
110    #[test]
111    fn enrich_disabled_is_noop() {
112        let (schema, batch) = sample_batch();
113        let meta = MetaColumns {
114            exported_at: false,
115            row_hash: false,
116        };
117        let enriched_schema = enrich_schema(&schema, &meta);
118        assert_eq!(enriched_schema.fields().len(), 2);
119        let result = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
120        assert_eq!(result.num_columns(), 2);
121    }
122
123    #[test]
124    fn enrich_exported_at_only() {
125        let (schema, batch) = sample_batch();
126        let meta = MetaColumns {
127            exported_at: true,
128            row_hash: false,
129        };
130        let enriched_schema = enrich_schema(&schema, &meta);
131        assert_eq!(enriched_schema.fields().len(), 3);
132        assert_eq!(enriched_schema.field(2).name(), COL_EXPORTED_AT);
133
134        let ts = 1_711_612_800_000_000i64;
135        let result = enrich_batch(&batch, &meta, &enriched_schema, ts).unwrap();
136        assert_eq!(result.num_columns(), 3);
137        assert_eq!(result.num_rows(), 3);
138
139        let ts_col = result
140            .column(2)
141            .as_any()
142            .downcast_ref::<TimestampMicrosecondArray>()
143            .unwrap();
144        assert_eq!(ts_col.value(0), ts);
145        assert_eq!(ts_col.value(2), ts);
146    }
147
148    #[test]
149    fn enrich_row_hash_only() {
150        let (schema, batch) = sample_batch();
151        let meta = MetaColumns {
152            exported_at: false,
153            row_hash: true,
154        };
155        let enriched_schema = enrich_schema(&schema, &meta);
156        assert_eq!(enriched_schema.field(2).name(), COL_ROW_HASH);
157        assert_eq!(*enriched_schema.field(2).data_type(), DataType::Int64);
158
159        let result = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
160        let hash_col = result
161            .column(2)
162            .as_any()
163            .downcast_ref::<Int64Array>()
164            .unwrap();
165
166        // Different rows produce different hashes
167        assert_ne!(hash_col.value(0), hash_col.value(1));
168        assert_ne!(hash_col.value(1), hash_col.value(2));
169    }
170
171    #[test]
172    fn enrich_both_columns() {
173        let (schema, batch) = sample_batch();
174        let meta = MetaColumns {
175            exported_at: true,
176            row_hash: true,
177        };
178        let enriched_schema = enrich_schema(&schema, &meta);
179        assert_eq!(enriched_schema.fields().len(), 4);
180        assert_eq!(enriched_schema.field(2).name(), COL_EXPORTED_AT);
181        assert_eq!(enriched_schema.field(3).name(), COL_ROW_HASH);
182
183        let result = enrich_batch(&batch, &meta, &enriched_schema, 123456).unwrap();
184        assert_eq!(result.num_columns(), 4);
185        assert_eq!(result.num_rows(), 3);
186    }
187
188    #[test]
189    fn hash_is_deterministic() {
190        let (schema, batch) = sample_batch();
191        let meta = MetaColumns {
192            exported_at: false,
193            row_hash: true,
194        };
195        let enriched_schema = enrich_schema(&schema, &meta);
196
197        let r1 = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
198        let r2 = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
199
200        let h1 = r1.column(2).as_any().downcast_ref::<Int64Array>().unwrap();
201        let h2 = r2.column(2).as_any().downcast_ref::<Int64Array>().unwrap();
202        for i in 0..3 {
203            assert_eq!(
204                h1.value(i),
205                h2.value(i),
206                "hash should be deterministic for row {i}"
207            );
208        }
209    }
210
211    #[test]
212    fn hash_distinguishes_null_from_empty() {
213        let schema = Arc::new(Schema::new(vec![Field::new("val", DataType::Utf8, true)]));
214        let batch = RecordBatch::try_new(
215            schema.clone(),
216            vec![Arc::new(StringArray::from(vec![None, Some("")]))],
217        )
218        .unwrap();
219
220        let meta = MetaColumns {
221            exported_at: false,
222            row_hash: true,
223        };
224        let enriched_schema = enrich_schema(&schema, &meta);
225        let result = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
226        let hashes = result
227            .column(1)
228            .as_any()
229            .downcast_ref::<Int64Array>()
230            .unwrap();
231        assert_ne!(
232            hashes.value(0),
233            hashes.value(1),
234            "NULL and empty string should hash differently"
235        );
236    }
237}