1use std::sync::Arc;
2
3use arrow::array::{ArrayRef, Int64Array, TimestampMicrosecondArray};
4use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
5use arrow::record_batch::RecordBatch;
6
7use crate::config::MetaColumns;
8use crate::error::Result;
9
10pub const COL_EXPORTED_AT: &str = "_rivet_exported_at";
11pub const COL_ROW_HASH: &str = "_rivet_row_hash";
12
13pub fn enrich_schema(schema: &SchemaRef, meta: &MetaColumns) -> SchemaRef {
15 if !meta.exported_at && !meta.row_hash {
16 return schema.clone();
17 }
18 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
19 if meta.exported_at {
20 fields.push(Arc::new(Field::new(
21 COL_EXPORTED_AT,
22 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
23 false,
24 )));
25 }
26 if meta.row_hash {
27 fields.push(Arc::new(Field::new(COL_ROW_HASH, DataType::Int64, false)));
28 }
29 Arc::new(Schema::new(fields))
30}
31
32pub fn enrich_batch(
35 batch: &RecordBatch,
36 meta: &MetaColumns,
37 enriched_schema: &SchemaRef,
38 exported_at_us: i64,
39) -> Result<RecordBatch> {
40 if !meta.exported_at && !meta.row_hash {
41 return Ok(batch.clone());
42 }
43
44 let n = batch.num_rows();
45 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
46
47 if meta.exported_at {
48 let ts_array =
49 TimestampMicrosecondArray::from(vec![Some(exported_at_us); n]).with_timezone("UTC");
50 columns.push(Arc::new(ts_array));
51 }
52
53 if meta.row_hash {
54 let mut hashes = Vec::with_capacity(n);
55 for row in 0..n {
56 hashes.push(hash_row(batch, row));
57 }
58 columns.push(Arc::new(Int64Array::from(hashes)));
59 }
60
61 Ok(RecordBatch::try_new(enriched_schema.clone(), columns)?)
62}
63
64fn hash_row(batch: &RecordBatch, row: usize) -> i64 {
67 use xxhash_rust::xxh3::xxh3_128;
68
69 let mut buf = Vec::with_capacity(256);
70 for col_idx in 0..batch.num_columns() {
71 let array = batch.column(col_idx);
72 if array.is_null(row) {
73 buf.extend_from_slice(b"\x00");
74 } else {
75 let s = arrow::util::display::array_value_to_string(array, row).unwrap_or_default();
76 buf.extend_from_slice(s.as_bytes());
77 }
78 buf.push(b'\x1f'); }
80 let h = xxh3_128(&buf);
81 h as i64 }
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87 use arrow::array::StringArray;
88 use arrow::datatypes::Field;
89
90 fn sample_batch() -> (SchemaRef, RecordBatch) {
91 let schema = Arc::new(Schema::new(vec![
92 Field::new("id", DataType::Int64, false),
93 Field::new("name", DataType::Utf8, true),
94 ]));
95 let batch = RecordBatch::try_new(
96 schema.clone(),
97 vec![
98 Arc::new(Int64Array::from(vec![1, 2, 3])),
99 Arc::new(StringArray::from(vec![
100 Some("alice"),
101 None,
102 Some("charlie"),
103 ])),
104 ],
105 )
106 .unwrap();
107 (schema, batch)
108 }
109
110 #[test]
111 fn enrich_disabled_is_noop() {
112 let (schema, batch) = sample_batch();
113 let meta = MetaColumns {
114 exported_at: false,
115 row_hash: false,
116 };
117 let enriched_schema = enrich_schema(&schema, &meta);
118 assert_eq!(enriched_schema.fields().len(), 2);
119 let result = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
120 assert_eq!(result.num_columns(), 2);
121 }
122
123 #[test]
124 fn enrich_exported_at_only() {
125 let (schema, batch) = sample_batch();
126 let meta = MetaColumns {
127 exported_at: true,
128 row_hash: false,
129 };
130 let enriched_schema = enrich_schema(&schema, &meta);
131 assert_eq!(enriched_schema.fields().len(), 3);
132 assert_eq!(enriched_schema.field(2).name(), COL_EXPORTED_AT);
133
134 let ts = 1_711_612_800_000_000i64;
135 let result = enrich_batch(&batch, &meta, &enriched_schema, ts).unwrap();
136 assert_eq!(result.num_columns(), 3);
137 assert_eq!(result.num_rows(), 3);
138
139 let ts_col = result
140 .column(2)
141 .as_any()
142 .downcast_ref::<TimestampMicrosecondArray>()
143 .unwrap();
144 assert_eq!(ts_col.value(0), ts);
145 assert_eq!(ts_col.value(2), ts);
146 }
147
148 #[test]
149 fn enrich_row_hash_only() {
150 let (schema, batch) = sample_batch();
151 let meta = MetaColumns {
152 exported_at: false,
153 row_hash: true,
154 };
155 let enriched_schema = enrich_schema(&schema, &meta);
156 assert_eq!(enriched_schema.field(2).name(), COL_ROW_HASH);
157 assert_eq!(*enriched_schema.field(2).data_type(), DataType::Int64);
158
159 let result = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
160 let hash_col = result
161 .column(2)
162 .as_any()
163 .downcast_ref::<Int64Array>()
164 .unwrap();
165
166 assert_ne!(hash_col.value(0), hash_col.value(1));
168 assert_ne!(hash_col.value(1), hash_col.value(2));
169 }
170
171 #[test]
172 fn enrich_both_columns() {
173 let (schema, batch) = sample_batch();
174 let meta = MetaColumns {
175 exported_at: true,
176 row_hash: true,
177 };
178 let enriched_schema = enrich_schema(&schema, &meta);
179 assert_eq!(enriched_schema.fields().len(), 4);
180 assert_eq!(enriched_schema.field(2).name(), COL_EXPORTED_AT);
181 assert_eq!(enriched_schema.field(3).name(), COL_ROW_HASH);
182
183 let result = enrich_batch(&batch, &meta, &enriched_schema, 123456).unwrap();
184 assert_eq!(result.num_columns(), 4);
185 assert_eq!(result.num_rows(), 3);
186 }
187
188 #[test]
189 fn hash_is_deterministic() {
190 let (schema, batch) = sample_batch();
191 let meta = MetaColumns {
192 exported_at: false,
193 row_hash: true,
194 };
195 let enriched_schema = enrich_schema(&schema, &meta);
196
197 let r1 = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
198 let r2 = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
199
200 let h1 = r1.column(2).as_any().downcast_ref::<Int64Array>().unwrap();
201 let h2 = r2.column(2).as_any().downcast_ref::<Int64Array>().unwrap();
202 for i in 0..3 {
203 assert_eq!(
204 h1.value(i),
205 h2.value(i),
206 "hash should be deterministic for row {i}"
207 );
208 }
209 }
210
211 #[test]
212 fn hash_distinguishes_null_from_empty() {
213 let schema = Arc::new(Schema::new(vec![Field::new("val", DataType::Utf8, true)]));
214 let batch = RecordBatch::try_new(
215 schema.clone(),
216 vec![Arc::new(StringArray::from(vec![None, Some("")]))],
217 )
218 .unwrap();
219
220 let meta = MetaColumns {
221 exported_at: false,
222 row_hash: true,
223 };
224 let enriched_schema = enrich_schema(&schema, &meta);
225 let result = enrich_batch(&batch, &meta, &enriched_schema, 0).unwrap();
226 let hashes = result
227 .column(1)
228 .as_any()
229 .downcast_ref::<Int64Array>()
230 .unwrap();
231 assert_ne!(
232 hashes.value(0),
233 hashes.value(1),
234 "NULL and empty string should hash differently"
235 );
236 }
237}