1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use sha2::{Digest, Sha256};
5use zer_core::{
6 field_mapping::FieldMapping,
7 record::{FieldValue, Record},
8 schema::{FieldKind, Schema},
9};
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13pub struct FieldStats {
14 pub name: String,
15 pub kind: FieldKind,
16 pub null_rate: f32,
18 pub cardinality: usize,
20 pub top_k: Vec<String>,
22}
23
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
30pub struct SchemaFingerprint {
31 pub schema_hash: [u8; 32],
33 pub field_stats: Vec<FieldStats>,
35 pub record_count: u64,
37 pub created_at: u64,
39}
40
41fn compute_schema_hash(schema: &Schema, mappings: &[FieldMapping]) -> [u8; 32] {
51 let mut sorted: Vec<_> = schema.fields.iter().collect();
52 sorted.sort_by_key(|f| f.name.as_str());
53
54 let mut hasher = Sha256::new();
55 for field in sorted {
56 let kind_bytes = bincode::serialize(&field.kind).unwrap_or_default();
59 hasher.update(field.name.as_bytes());
60 hasher.update(b":");
61 hasher.update(&kind_bytes);
62 hasher.update(b"|");
63 }
64
65 if !mappings.is_empty() {
66 hasher.update(b"mappings:");
67 let mut sorted_m: Vec<_> = mappings.iter().collect();
68 sorted_m.sort_by(|a, b| a.a_field.cmp(&b.a_field));
69 for m in sorted_m {
70 hasher.update(m.a_field.as_bytes());
71 hasher.update(b":");
72 hasher.update(m.b_field.as_bytes());
73 hasher.update(b"|");
74 }
75 }
76
77 hasher.finalize().into()
78}
79
80fn unix_now() -> u64 {
81 SystemTime::now()
82 .duration_since(UNIX_EPOCH)
83 .unwrap_or_default()
84 .as_secs()
85}
86
87fn field_value_to_string(v: &FieldValue) -> Option<String> {
90 match v {
91 FieldValue::Text(s) if !s.is_empty() => Some(s.clone()),
92 FieldValue::Int(i) => Some(i.to_string()),
93 FieldValue::Float(f) => Some(f.to_string()),
94 FieldValue::Bool(b) => Some(b.to_string()),
95 _ => None,
96 }
97}
98
99fn compute_field_stats(name: &str, kind: FieldKind, records: &[Record]) -> FieldStats {
101 let total = records.len();
102 if total == 0 {
103 return FieldStats {
104 name: name.to_string(),
105 kind,
106 null_rate: 0.0,
107 cardinality: 0,
108 top_k: vec![],
109 };
110 }
111
112 let mut null_count = 0usize;
113 let mut freq: HashMap<String, usize> = HashMap::new();
114
115 for record in records {
116 match record.fields.get(name) {
117 None | Some(FieldValue::Null) => null_count += 1,
118 Some(v) => match field_value_to_string(v) {
119 Some(s) => *freq.entry(s).or_insert(0) += 1,
120 None => null_count += 1,
121 },
122 }
123 }
124
125 let null_rate = null_count as f32 / total as f32;
126 let cardinality = freq.len();
127
128 let mut freq_vec: Vec<(String, usize)> = freq.into_iter().collect();
130 freq_vec.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
131 let top_k = freq_vec.into_iter().take(10).map(|(s, _)| s).collect();
132
133 FieldStats {
134 name: name.to_string(),
135 kind,
136 null_rate,
137 cardinality,
138 top_k,
139 }
140}
141
142impl SchemaFingerprint {
145 pub fn from_schema(schema: &Schema) -> Self {
151 Self::from_schema_with_mappings(schema, &[])
152 }
153
154 pub fn from_schema_with_mappings(schema: &Schema, mappings: &[FieldMapping]) -> Self {
157 let schema_hash = compute_schema_hash(schema, mappings);
158 let field_stats = schema
159 .fields
160 .iter()
161 .map(|f| FieldStats {
162 name: f.name.clone(),
163 kind: f.kind,
164 null_rate: 0.0,
165 cardinality: 0,
166 top_k: vec![],
167 })
168 .collect();
169 Self {
170 schema_hash,
171 field_stats,
172 record_count: 0,
173 created_at: unix_now(),
174 }
175 }
176
177 pub fn from_sample(schema: &Schema, records: &[Record]) -> Self {
183 Self::from_sample_with_mappings(schema, records, &[])
184 }
185
186 pub fn from_sample_with_mappings(
188 schema: &Schema,
189 records: &[Record],
190 mappings: &[FieldMapping],
191 ) -> Self {
192 let schema_hash = compute_schema_hash(schema, mappings);
193 let field_stats = schema
194 .fields
195 .iter()
196 .map(|f| compute_field_stats(&f.name, f.kind, records))
197 .collect();
198 Self {
199 schema_hash,
200 field_stats,
201 record_count: records.len() as u64,
202 created_at: unix_now(),
203 }
204 }
205}
206
207#[cfg(test)]
210mod tests {
211 use super::*;
212 use zer_core::schema::SchemaBuilder;
213
214 fn make_schema_ab() -> zer_core::schema::Schema {
215 SchemaBuilder::new()
216 .field("alpha", FieldKind::Name)
217 .field("beta", FieldKind::Date)
218 .build()
219 .unwrap()
220 }
221
222 #[test]
223 fn same_schema_same_hash() {
224 let s1 = make_schema_ab();
225 let s2 = make_schema_ab();
226 assert_eq!(
227 compute_schema_hash(&s1, &[]),
228 compute_schema_hash(&s2, &[]),
229 "identical schemas must produce identical hashes"
230 );
231 }
232
233 #[test]
234 fn reordered_fields_same_hash() {
235 let s1 = SchemaBuilder::new()
236 .field("alpha", FieldKind::Name)
237 .field("beta", FieldKind::Date)
238 .build()
239 .unwrap();
240 let s2 = SchemaBuilder::new()
241 .field("beta", FieldKind::Date)
242 .field("alpha", FieldKind::Name)
243 .build()
244 .unwrap();
245
246 assert_eq!(
247 compute_schema_hash(&s1, &[]),
248 compute_schema_hash(&s2, &[]),
249 "field order must not affect schema hash"
250 );
251 }
252
253 #[test]
254 fn different_kinds_different_hash() {
255 let s1 = SchemaBuilder::new()
256 .field("alpha", FieldKind::Name)
257 .build()
258 .unwrap();
259 let s2 = SchemaBuilder::new()
260 .field("alpha", FieldKind::Date)
261 .build()
262 .unwrap();
263
264 assert_ne!(
265 compute_schema_hash(&s1, &[]),
266 compute_schema_hash(&s2, &[]),
267 "same field name with different kinds must produce different hashes"
268 );
269 }
270
271 #[test]
272 fn from_schema_populates_field_names() {
273 let schema = make_schema_ab();
274 let fp = SchemaFingerprint::from_schema(&schema);
275
276 assert_eq!(fp.field_stats.len(), 2);
277 assert_eq!(fp.record_count, 0);
278 let names: Vec<&str> = fp.field_stats.iter().map(|f| f.name.as_str()).collect();
279 assert!(names.contains(&"alpha"));
280 assert!(names.contains(&"beta"));
281 }
282
283 #[test]
284 fn from_sample_computes_cardinality_and_null_rate() {
285 use zer_core::record::Record;
286
287 let schema = SchemaBuilder::new()
288 .field("name", FieldKind::Name)
289 .build()
290 .unwrap();
291
292 let records = vec![
293 Record::new(1).insert("name", FieldValue::Text("Alice".into())),
294 Record::new(2).insert("name", FieldValue::Text("Bob".into())),
295 Record::new(3).insert("name", FieldValue::Text("Alice".into())),
296 Record::new(4), ];
298
299 let fp = SchemaFingerprint::from_sample(&schema, &records);
300
301 assert_eq!(fp.record_count, 4);
302 let stats = fp.field_stats.iter().find(|f| f.name == "name").unwrap();
303 assert_eq!(stats.cardinality, 2, "Alice and Bob are 2 distinct values");
304 assert!(
305 (stats.null_rate - 0.25).abs() < 1e-6,
306 "1 out of 4 records is null"
307 );
308 assert_eq!(
309 stats.top_k[0], "Alice",
310 "Alice appears twice, so it should be first"
311 );
312 }
313
314 #[test]
315 fn from_schema_and_from_sample_same_hash_for_same_schema() {
316 let schema = make_schema_ab();
317 let records = vec![Record::new(1)
318 .insert("alpha", FieldValue::Text("x".into()))
319 .insert("beta", FieldValue::Text("2024-01-01".into()))];
320 let fp_s = SchemaFingerprint::from_schema(&schema);
321 let fp_r = SchemaFingerprint::from_sample(&schema, &records);
322
323 assert_eq!(
324 fp_s.schema_hash, fp_r.schema_hash,
325 "from_schema and from_sample must yield the same hash for the same schema"
326 );
327 }
328}