1use std::collections::BTreeMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::{
6 context::{
7 BooleanContext, BytesContext, MapStructContext, NullContext, NumberContext,
8 SequenceContext, StringContext,
9 },
10 Coalesce, StructuralEq,
11};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "type")]
19pub enum Schema {
20 Null(NullContext),
25 Boolean(BooleanContext),
27 Integer(NumberContext<i128>),
29 Float(NumberContext<f64>),
31 String(StringContext),
33 Bytes(BytesContext),
35 Sequence {
38 field: Box<Field>,
40 context: SequenceContext,
43 },
44 Struct {
48 fields: BTreeMap<String, Field>,
51 context: MapStructContext,
54 },
55 Union {
60 variants: Vec<Schema>,
62 },
63 }
66
67#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
71pub struct Field {
72 #[serde(flatten)]
75 pub status: FieldStatus,
76 #[serde(flatten)]
79 pub schema: Option<Schema>,
80}
81
82#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
84pub struct FieldStatus {
85 pub may_be_null: bool,
87 pub may_be_normal: bool,
90 pub may_be_missing: bool,
94 pub may_be_duplicate: bool,
99}
100
101impl StructuralEq for Schema {
105 fn structural_eq(&self, other: &Self) -> bool {
106 use Schema::*;
107 match (self, other) {
108 (Null(_), Null(_)) => true,
109 (Boolean(_), Boolean(_)) => true,
110 (Integer(_), Integer(_)) => true,
111 (Float(_), Float(_)) => true,
112 (String(_), String(_)) => true,
113 (Bytes(_), Bytes(_)) => true,
114
115 (Sequence { field: field_1, .. }, Sequence { field: field_2, .. }) => {
116 field_1.structural_eq(field_2)
117 }
118
119 (
120 Struct {
121 fields: fields_1, ..
122 },
123 Struct {
124 fields: fields_2, ..
125 },
126 ) => fields_1.structural_eq(fields_2),
127
128 (Union { variants: s }, Union { variants: o }) => {
129 let mut s = s.clone();
130 let mut o = o.clone();
131 s.sort_by(schema_cmp);
132 o.sort_by(schema_cmp);
133 s.structural_eq(&o)
134 }
135
136 (Null(_), _)
138 | (Boolean(_), _)
139 | (Integer(_), _)
140 | (Float(_), _)
141 | (String(_), _)
142 | (Bytes(_), _)
143 | (Sequence { .. }, _)
144 | (Struct { .. }, _)
145 | (Union { .. }, _) => false,
146 }
147 }
148}
149impl Coalesce for Schema {
150 fn coalesce(&mut self, other: Self) {
151 use Schema::*;
152 match (self, other) {
153 (Boolean(s), Boolean(o)) => s.coalesce(o),
154 (Integer(s), Integer(o)) => s.coalesce(o),
155 (Float(s), Float(o)) => s.coalesce(o),
156 (String(s), String(o)) => s.coalesce(o),
157 (Bytes(s), Bytes(o)) => s.coalesce(o),
158
159 (
160 Sequence {
161 field: self_boxed,
162 context: self_agg,
163 },
164 Sequence {
165 field: other_boxed,
166 context: other_agg,
167 },
168 ) => {
169 self_agg.coalesce(other_agg);
170 self_boxed.coalesce(*other_boxed);
171 }
172
173 (
174 Struct {
175 fields: self_fields,
176 context: self_agg,
177 },
178 Struct {
179 fields: other_fields,
180 context: other_agg,
181 },
182 ) => {
183 self_agg.coalesce(other_agg);
184 for (name, other_schema) in other_fields {
185 self_fields
186 .entry(name)
187 .and_modify(|schema| schema.coalesce(other_schema.clone()))
188 .or_insert_with(|| other_schema);
189 }
190 }
191 (
192 Union {
193 variants: self_alternatives,
194 },
195 Union {
196 variants: other_alternatives,
197 },
198 ) => coalesce_unions(self_alternatives, other_alternatives),
199 (
200 Union {
201 variants: self_alternatives,
202 },
203 any_other,
204 ) => coalesce_to_alternatives(self_alternatives, any_other),
205 (
206 any_self,
207 Union {
208 variants: mut other_alternatives,
209 },
210 ) => {
211 let self_original = std::mem::replace(any_self, Schema::Null(Default::default()));
212 coalesce_to_alternatives(&mut other_alternatives, self_original);
213 *any_self = Schema::Union {
214 variants: other_alternatives,
215 };
216 }
217
218 (any_self, any_other) => {
219 let self_original = std::mem::replace(any_self, Schema::Null(Default::default()));
220 *any_self = Union {
221 variants: vec![self_original, any_other],
222 };
223 }
224 };
225 return;
226
227 fn coalesce_unions(selfs: &mut Vec<Schema>, others: Vec<Schema>) {
228 for o in others {
229 coalesce_to_alternatives(selfs, o);
230 }
231 }
232
233 fn coalesce_to_alternatives(alternatives: &mut Vec<Schema>, mut other: Schema) {
237 use Schema::*;
238 for s in alternatives.iter_mut() {
239 match (s, other) {
240 (Union { .. }, _) | (_, Union { .. }) => {
243 unreachable!("nested union")
244 }
245
246 (Boolean(s), Boolean(o)) => {
248 s.coalesce(o);
249 return;
250 }
251 (Integer(s), Integer(o)) => {
252 s.coalesce(o);
253 return;
254 }
255 (Float(s), Float(o)) => {
256 s.coalesce(o);
257 return;
258 }
259 (String(s), String(o)) => {
260 s.coalesce(o);
261 return;
262 }
263 (Bytes(s), Bytes(o)) => {
264 s.coalesce(o);
265 return;
266 }
267
268 (
269 Sequence {
270 field: self_boxed,
271 context: self_agg,
272 },
273 Sequence {
274 field: other_boxed,
275 context: other_agg,
276 },
277 ) => {
278 self_agg.coalesce(other_agg);
279 self_boxed.coalesce(*other_boxed);
280 return;
281 }
282
283 (
284 Struct {
285 fields: self_fields,
286 context: self_agg,
287 },
288 Struct {
289 fields: other_fields,
290 context: other_agg,
291 },
292 ) => {
293 self_agg.coalesce(other_agg);
294 for (name, other_schema) in other_fields {
295 self_fields
296 .entry(name)
297 .and_modify(|schema| schema.coalesce(other_schema.clone()))
298 .or_insert_with(|| other_schema);
299 }
300 return;
301 }
302
303 (_, caught_other) => {
305 other = caught_other;
306 }
307 }
308 }
309
310 alternatives.push(other);
312 }
313 }
314}
315impl PartialEq for Schema {
316 fn eq(&self, other: &Self) -> bool {
317 use Schema::*;
318 match (self, other) {
319 (Null(s), Null(o)) => s == o,
320 (Boolean(s), Boolean(o)) => s == o,
321 (Integer(s), Integer(o)) => s == o,
322 (Float(s), Float(o)) => s == o,
323 (String(s), String(o)) => s == o,
324 (Bytes(s), Bytes(o)) => s == o,
325
326 (
327 Sequence {
328 field: field_1,
329 context: context_1,
330 },
331 Sequence {
332 field: field_2,
333 context: context_2,
334 },
335 ) => field_1 == field_2 && context_1 == context_2,
336
337 (
338 Struct {
339 fields: fields_1,
340 context: context_1,
341 },
342 Struct {
343 fields: fields_2,
344 context: context_2,
345 },
346 ) => fields_1 == fields_2 && context_1 == context_2,
347
348 (Union { variants: s }, Union { variants: o }) => {
349 let mut s = s.clone();
350 let mut o = o.clone();
351 s.sort_by(schema_cmp);
352 o.sort_by(schema_cmp);
353 s == o
354 }
355
356 (Null(_), _)
358 | (Boolean(_), _)
359 | (Integer(_), _)
360 | (Float(_), _)
361 | (String(_), _)
362 | (Bytes(_), _)
363 | (Sequence { .. }, _)
364 | (Struct { .. }, _)
365 | (Union { .. }, _) => false,
366 }
367 }
368}
369
370impl Field {
374 pub fn with_schema(schema: Schema) -> Self {
376 Self {
377 status: FieldStatus::default(),
378 schema: Some(schema),
379 }
380 }
381}
382impl Coalesce for Field {
383 fn coalesce(&mut self, other: Self)
384 where
385 Self: Sized,
386 {
387 self.status.coalesce(other.status);
388 self.schema = match (self.schema.take(), other.schema) {
389 (Some(mut s), Some(o)) => {
390 s.coalesce(o);
391 Some(s)
392 }
393 (Some(s), None) => Some(s),
394 (None, Some(o)) => Some(o),
395 (None, None) => None,
396 }
397 }
398}
399impl StructuralEq for Field {
400 fn structural_eq(&self, other: &Self) -> bool {
401 self.status == other.status && self.schema.structural_eq(&other.schema)
402 }
403}
404
405impl FieldStatus {
409 pub fn allow_duplicates(&mut self, is_duplicate: bool) {
412 self.may_be_duplicate |= is_duplicate;
413 }
414 pub fn is_option(&self) -> bool {
416 self.may_be_null || self.may_be_missing
417 }
418}
419impl Coalesce for FieldStatus {
420 fn coalesce(&mut self, other: Self)
421 where
422 Self: Sized,
423 {
424 self.may_be_null |= other.may_be_null;
425 self.may_be_normal |= other.may_be_normal;
426 self.may_be_missing |= other.may_be_missing;
427 self.may_be_duplicate |= other.may_be_duplicate;
428 }
429}
430
431fn schema_cmp(first: &Schema, second: &Schema) -> std::cmp::Ordering {
440 use std::cmp::Ordering::*;
441 use Schema::*;
442 match first {
443 Null(_) => match second {
444 Null(_) => Equal,
445 _ => Less,
446 },
447 Boolean(_) => match second {
448 Null(_) | Boolean(_) => Equal,
449 _ => Less,
450 },
451 Integer(_) => match second {
452 Null(_) | Boolean(_) => Greater,
453 Integer(_) => Equal,
454 _ => Less,
455 },
456 Float(_) => match second {
457 Null(_) | Boolean(_) | Integer(_) => Greater,
458 Float(_) => Equal,
459 _ => Less,
460 },
461 String(_) => match second {
462 Null(_) | Boolean(_) | Integer(_) | Float(_) => Greater,
463 String(_) => Equal,
464 _ => Less,
465 },
466 Bytes(_) => match second {
467 Null(_) | Boolean(_) | Integer(_) | Float(_) | String(_) => Greater,
468 Bytes(_) => Equal,
469 _ => Less,
470 },
471 Sequence { .. } => match second {
472 Null(_) | Boolean(_) | Integer(_) | Float(_) | String(_) | Bytes(_) => Greater,
473 Sequence { .. } => Equal,
474 _ => Less,
475 },
476 Struct { .. } => match second {
477 Null(_)
478 | Boolean(_)
479 | Integer(_)
480 | Float(_)
481 | String(_)
482 | Bytes(_)
483 | Sequence { .. } => Greater,
484 Struct { .. } => Equal,
485 _ => Less,
486 },
487 Union { .. } => match second {
488 Null(_)
489 | Boolean(_)
490 | Integer(_)
491 | Float(_)
492 | String(_)
493 | Bytes(_)
494 | Sequence { .. }
495 | Struct { .. } => Greater,
496 Union { .. } => Equal,
497 },
498 }
499}