1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use sha2::{Digest, Sha256};
5use zer_core::{
6 field_mapping::FieldMapping,
7 record::{FieldValue, Record},
8 schema::{FieldKind, Schema},
9};
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13pub struct FieldStats {
14 pub name: String,
15 pub kind: FieldKind,
16 pub null_rate: f32,
18 pub cardinality: usize,
20 pub top_k: Vec<String>,
22}
23
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
30pub struct SchemaFingerprint {
31 pub schema_hash: [u8; 32],
33 pub field_stats: Vec<FieldStats>,
35 pub record_count: u64,
37 pub created_at: u64,
39}
40
41fn compute_schema_hash(schema: &Schema, mappings: &[FieldMapping]) -> [u8; 32] {
51 let mut sorted: Vec<_> = schema.fields.iter().collect();
52 sorted.sort_by_key(|f| f.name.as_str());
53
54 let mut hasher = Sha256::new();
55 for field in sorted {
56 let kind_bytes = bincode::serialize(&field.kind).unwrap_or_default();
59 hasher.update(field.name.as_bytes());
60 hasher.update(b":");
61 hasher.update(&kind_bytes);
62 hasher.update(b"|");
63 }
64
65 if !mappings.is_empty() {
66 hasher.update(b"mappings:");
67 let mut sorted_m: Vec<_> = mappings.iter().collect();
68 sorted_m.sort_by(|a, b| a.a_field.cmp(&b.a_field));
69 for m in sorted_m {
70 hasher.update(m.a_field.as_bytes());
71 hasher.update(b":");
72 hasher.update(m.b_field.as_bytes());
73 hasher.update(b"|");
74 }
75 }
76
77 hasher.finalize().into()
78}
79
80fn unix_now() -> u64 {
81 SystemTime::now()
82 .duration_since(UNIX_EPOCH)
83 .unwrap_or_default()
84 .as_secs()
85}
86
87fn field_value_to_string(v: &FieldValue) -> Option<String> {
90 match v {
91 FieldValue::Text(s) if !s.is_empty() => Some(s.clone()),
92 FieldValue::Int(i) => Some(i.to_string()),
93 FieldValue::Float(f) => Some(f.to_string()),
94 FieldValue::Bool(b) => Some(b.to_string()),
95 _ => None,
96 }
97}
98
99fn compute_field_stats(name: &str, kind: FieldKind, records: &[Record]) -> FieldStats {
101 let total = records.len();
102 if total == 0 {
103 return FieldStats {
104 name: name.to_string(),
105 kind,
106 null_rate: 0.0,
107 cardinality: 0,
108 top_k: vec![],
109 };
110 }
111
112 let mut null_count = 0usize;
113 let mut freq: HashMap<String, usize> = HashMap::new();
114
115 for record in records {
116 match record.fields.get(name) {
117 None | Some(FieldValue::Null) => null_count += 1,
118 Some(v) => match field_value_to_string(v) {
119 Some(s) => *freq.entry(s).or_insert(0) += 1,
120 None => null_count += 1,
121 },
122 }
123 }
124
125 let null_rate = null_count as f32 / total as f32;
126 let cardinality = freq.len();
127
128 let mut freq_vec: Vec<(String, usize)> = freq.into_iter().collect();
130 freq_vec.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
131 let top_k = freq_vec.into_iter().take(10).map(|(s, _)| s).collect();
132
133 FieldStats { name: name.to_string(), kind, null_rate, cardinality, top_k }
134}
135
136impl SchemaFingerprint {
139 pub fn from_schema(schema: &Schema) -> Self {
145 Self::from_schema_with_mappings(schema, &[])
146 }
147
148 pub fn from_schema_with_mappings(schema: &Schema, mappings: &[FieldMapping]) -> Self {
151 let schema_hash = compute_schema_hash(schema, mappings);
152 let field_stats = schema
153 .fields
154 .iter()
155 .map(|f| FieldStats {
156 name: f.name.clone(),
157 kind: f.kind,
158 null_rate: 0.0,
159 cardinality: 0,
160 top_k: vec![],
161 })
162 .collect();
163 Self { schema_hash, field_stats, record_count: 0, created_at: unix_now() }
164 }
165
166 pub fn from_sample(schema: &Schema, records: &[Record]) -> Self {
172 Self::from_sample_with_mappings(schema, records, &[])
173 }
174
175 pub fn from_sample_with_mappings(
177 schema: &Schema,
178 records: &[Record],
179 mappings: &[FieldMapping],
180 ) -> Self {
181 let schema_hash = compute_schema_hash(schema, mappings);
182 let field_stats = schema
183 .fields
184 .iter()
185 .map(|f| compute_field_stats(&f.name, f.kind, records))
186 .collect();
187 Self {
188 schema_hash,
189 field_stats,
190 record_count: records.len() as u64,
191 created_at: unix_now(),
192 }
193 }
194}
195
196#[cfg(test)]
199mod tests {
200 use super::*;
201 use zer_core::schema::SchemaBuilder;
202
203 fn make_schema_ab() -> zer_core::schema::Schema {
204 SchemaBuilder::new()
205 .field("alpha", FieldKind::Name)
206 .field("beta", FieldKind::Date)
207 .build()
208 .unwrap()
209 }
210
211 #[test]
212 fn same_schema_same_hash() {
213 let s1 = make_schema_ab();
214 let s2 = make_schema_ab();
215 assert_eq!(
216 compute_schema_hash(&s1, &[]),
217 compute_schema_hash(&s2, &[]),
218 "identical schemas must produce identical hashes"
219 );
220 }
221
222 #[test]
223 fn reordered_fields_same_hash() {
224 let s1 = SchemaBuilder::new()
225 .field("alpha", FieldKind::Name)
226 .field("beta", FieldKind::Date)
227 .build()
228 .unwrap();
229 let s2 = SchemaBuilder::new()
230 .field("beta", FieldKind::Date)
231 .field("alpha", FieldKind::Name)
232 .build()
233 .unwrap();
234
235 assert_eq!(
236 compute_schema_hash(&s1, &[]),
237 compute_schema_hash(&s2, &[]),
238 "field order must not affect schema hash"
239 );
240 }
241
242 #[test]
243 fn different_kinds_different_hash() {
244 let s1 = SchemaBuilder::new()
245 .field("alpha", FieldKind::Name)
246 .build()
247 .unwrap();
248 let s2 = SchemaBuilder::new()
249 .field("alpha", FieldKind::Date)
250 .build()
251 .unwrap();
252
253 assert_ne!(
254 compute_schema_hash(&s1, &[]),
255 compute_schema_hash(&s2, &[]),
256 "same field name with different kinds must produce different hashes"
257 );
258 }
259
260 #[test]
261 fn from_schema_populates_field_names() {
262 let schema = make_schema_ab();
263 let fp = SchemaFingerprint::from_schema(&schema);
264
265 assert_eq!(fp.field_stats.len(), 2);
266 assert_eq!(fp.record_count, 0);
267 let names: Vec<&str> = fp.field_stats.iter().map(|f| f.name.as_str()).collect();
268 assert!(names.contains(&"alpha"));
269 assert!(names.contains(&"beta"));
270 }
271
272 #[test]
273 fn from_sample_computes_cardinality_and_null_rate() {
274 use zer_core::record::Record;
275
276 let schema = SchemaBuilder::new()
277 .field("name", FieldKind::Name)
278 .build()
279 .unwrap();
280
281 let records = vec![
282 Record::new(1).insert("name", FieldValue::Text("Alice".into())),
283 Record::new(2).insert("name", FieldValue::Text("Bob".into())),
284 Record::new(3).insert("name", FieldValue::Text("Alice".into())),
285 Record::new(4), ];
287
288 let fp = SchemaFingerprint::from_sample(&schema, &records);
289
290 assert_eq!(fp.record_count, 4);
291 let stats = fp.field_stats.iter().find(|f| f.name == "name").unwrap();
292 assert_eq!(stats.cardinality, 2, "Alice and Bob are 2 distinct values");
293 assert!(
294 (stats.null_rate - 0.25).abs() < 1e-6,
295 "1 out of 4 records is null"
296 );
297 assert_eq!(stats.top_k[0], "Alice", "Alice appears twice, so it should be first");
298 }
299
300 #[test]
301 fn from_schema_and_from_sample_same_hash_for_same_schema() {
302 let schema = make_schema_ab();
303 let records = vec![
304 Record::new(1)
305 .insert("alpha", FieldValue::Text("x".into()))
306 .insert("beta", FieldValue::Text("2024-01-01".into())),
307 ];
308 let fp_s = SchemaFingerprint::from_schema(&schema);
309 let fp_r = SchemaFingerprint::from_sample(&schema, &records);
310
311 assert_eq!(
312 fp_s.schema_hash, fp_r.schema_hash,
313 "from_schema and from_sample must yield the same hash for the same schema"
314 );
315 }
316}