1use error::{Error, ErrorKind, Result, ResultExt};
2
3use linked_hash_map;
4use serde_json;
5use std::collections;
6
7#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
8pub struct SchemaId(usize);
9
10#[derive(Clone, Debug)]
11pub enum SchemaRef {
12 Direct(Schema),
13 Indirect(SchemaId),
14}
15
16#[derive(Clone, Debug)]
17pub enum Schema {
18 Null,
19 Boolean,
20 Int,
21 Long,
22 Float,
23 Double,
24 Bytes,
25 String,
26 Record(RecordSchema),
27 Enum(EnumSchema),
28 Array(Box<SchemaRef>),
29 Map(Box<SchemaRef>),
30 Union(Vec<SchemaRef>),
31 Fixed(FixedSchema),
32}
33
34#[derive(Clone, Debug)]
35pub struct SchemaRegistry {
36 schemata: Vec<Schema>,
37 next_id: usize,
38 schemata_by_name: collections::HashMap<String, SchemaId>,
39}
40
41pub struct RecordFields<'a>(linked_hash_map::Values<'a, String, FieldSchema>);
42
43#[derive(Clone, Debug)]
44pub struct RecordSchema {
45 name: String,
46 doc: Option<String>,
47 fields: linked_hash_map::LinkedHashMap<String, FieldSchema>,
48}
49
50#[derive(Clone, Debug)]
51pub struct FieldSchema {
52 name: String,
53 doc: Option<String>,
54 field_type: SchemaRef,
55 default: Option<serde_json::Value>,
56}
57
58#[derive(Clone, Debug)]
59pub struct EnumSchema {
60 name: String,
61 doc: Option<String>,
62 symbols: Vec<String>,
63}
64
65#[derive(Clone, Debug)]
66pub struct FixedSchema {
67 name: String,
68 doc: Option<String>,
69 size: i32,
70}
71
72lazy_static! {
73 pub static ref EMPTY_REGISTRY: SchemaRegistry = {
74 SchemaRegistry::new()
75 };
76
77 pub static ref FILE_HEADER: Schema = {
78 let mut fields = linked_hash_map::LinkedHashMap::new();
79
80 fields.insert("magic".to_owned(), FieldSchema {
81 name: "magic".to_owned(),
82 doc: None,
83 default: None,
84 field_type: SchemaRef::Direct(Schema::Fixed(FixedSchema {
85 name: "org.apache.avro.file.Magic".to_owned(),
86 doc: None,
87 size: 4,
88 })),
89 });
90
91 fields.insert("meta".to_owned(), FieldSchema {
92 name: "meta".to_owned(),
93 doc: None,
94 default: None,
95 field_type: SchemaRef::Direct(Schema::Map(Box::new(SchemaRef::Direct(Schema::Bytes)))),
96 });
97
98 fields.insert("sync".to_owned(), FieldSchema {
99 name: "sync".to_owned(),
100 doc: None,
101 default: None,
102 field_type: SchemaRef::Direct(Schema::Fixed(FixedSchema {
103 name: "org.apache.avro.file.Sync".to_owned(),
104 doc: None,
105 size: 16,
106 })),
107 });
108
109 Schema::Record(RecordSchema {
110 name: "org.apache.avro.file.Header".to_owned(),
111 doc: None,
112 fields: fields,
113 })
114 };
115}
116
117impl SchemaRef {
118 pub fn resolve<'a>(&'a self, registry: &'a SchemaRegistry) -> &'a Schema {
119 match *self {
120 SchemaRef::Direct(ref schema) => schema,
121 SchemaRef::Indirect(id) => ®istry.schemata[id.0],
122 }
123 }
124
125 pub fn into_resolved(self, registry: &SchemaRegistry) -> Schema {
126 match self {
127 SchemaRef::Direct(schema) => schema,
128 SchemaRef::Indirect(id) => registry.schemata[id.0].clone(),
129 }
130 }
131}
132
133impl SchemaRegistry {
134 pub fn new() -> SchemaRegistry {
135 SchemaRegistry {
136 schemata: Vec::new(),
137 next_id: 0,
138 schemata_by_name: collections::HashMap::new(),
139 }
140 }
141
142 pub fn from_json(json: &serde_json::Value) -> Result<(SchemaRegistry, Option<SchemaRef>)> {
143 let mut result = SchemaRegistry::new();
144 let r = result.add_json(json)?;
145 Ok((result, r))
146 }
147
148 pub fn add_json(&mut self, json: &serde_json::Value) -> Result<Option<SchemaRef>> {
149 match json {
150 &serde_json::Value::Array(ref vs) => {
151 for v in vs {
152 self.create_schema_ref(None, v)?;
153 }
154 Ok(None)
155 },
156 _ => self.create_schema_ref(None, json).map(Some),
157 }
158 }
159
160 pub fn schema_by_name(&self, name: &str) -> Option<&Schema> {
161 self.schemata_by_name.get(name).map(|id| &self.schemata[id.0])
162 }
163
164 fn create_schema_ref(&mut self,
165 namespace: Option<&str>,
166 json: &serde_json::Value)
167 -> Result<SchemaRef> {
168 use serde_json::Value;
169 use serde_json::Value::*;
170
171 match *json {
172 String(ref name) => {
173 if let Some(primitive) = primitive_schema(name) {
174 Ok(primitive)
175 } else {
176 registered_schema(&self.schemata_by_name, namespace, name)
177 }
178 },
179 Object(ref obj) => {
180 let name = obj.get("type")
181 .and_then(Value::as_str)
182 .ok_or(Error::from(ErrorKind::InvalidSchema))
183 .chain_err(|| ErrorKind::FieldTypeMismatch("type", "string"))?;
184 if let Some(primitive) = primitive_schema(name) {
185 Ok(primitive)
186 } else {
187 match name {
188 "record" => self.create_record(namespace, obj),
189 "enum" => self.create_enum(namespace, obj),
190 "array" => self.create_array(namespace, obj),
191 "map" => self.create_map(namespace, obj),
192 "fixed" => self.create_fixed(namespace, obj),
193 _ => registered_schema(&self.schemata_by_name, namespace, name),
194 }
195 }
196 },
197 Array(ref elems) => {
198 let schemas =
199 elems.iter().map(|e| self.create_schema_ref(namespace, e)).collect::<Result<_>>()?;
200 Ok(SchemaRef::Direct(Schema::Union(schemas)))
201 },
202 _ => {
203 Err(Error::from(ErrorKind::InvalidSchema))
204 .chain_err(|| ErrorKind::FieldTypeMismatch("type", "string, object or array"))
205 },
206 }
207 }
208
209 fn create_record(&mut self,
210 namespace: Option<&str>,
211 obj: &serde_json::Map<String, serde_json::Value>)
212 -> Result<SchemaRef> {
213 let (namespace, schema_name) = full_name(namespace, obj)?;
214 let schema_id = self.alloc_schema_name(schema_name.clone())?;
215 self.schemata.push(Schema::Null);
217
218 let fields = obj.get("fields")
219 .ok_or(Error::from(ErrorKind::InvalidSchema))
220 .chain_err(|| ErrorKind::RequiredFieldMissing("fields"))
221 .and_then(|v| {
222 v.as_array()
223 .ok_or(Error::from(ErrorKind::InvalidSchema))
224 .chain_err(|| ErrorKind::FieldTypeMismatch("fields", "array"))
225 })
226 .and_then(|vs| {
227 vs.iter()
228 .map(|v| self.create_field(namespace, v))
229 .collect()
230 })?;
231
232 let doc = obj.get("doc")
233 .map(|v| {
234 v.as_str()
235 .map(ToOwned::to_owned)
236 .map(Some)
237 .ok_or(Error::from(ErrorKind::InvalidSchema))
238 .chain_err(|| ErrorKind::FieldTypeMismatch("doc", "string")) as
239 Result<Option<String>>
240 })
241 .unwrap_or(Ok(None))?;
242
243 self.schemata[schema_id.0] = Schema::Record(RecordSchema {
244 name: schema_name,
245 doc: doc,
246 fields: fields,
247 });
248
249 Ok(SchemaRef::Indirect(schema_id))
250 }
251
252 fn create_enum(&mut self,
253 namespace: Option<&str>,
254 obj: &serde_json::Map<String, serde_json::Value>)
255 -> Result<SchemaRef> {
256 let (_, schema_name) = full_name(namespace, obj)?;
257 let schema_id = self.alloc_schema_name(schema_name.clone())?;
258
259 let doc = obj.get("doc")
260 .map(|v| {
261 v.as_str()
262 .map(ToOwned::to_owned)
263 .map(Some)
264 .ok_or(Error::from(ErrorKind::InvalidSchema))
265 .chain_err(|| ErrorKind::FieldTypeMismatch("doc", "string")) as
266 Result<Option<String>>
267 })
268 .unwrap_or(Ok(None))?;
269
270 let symbols_array = obj.get("symbols")
271 .ok_or(Error::from(ErrorKind::InvalidSchema))
272 .chain_err(|| ErrorKind::RequiredFieldMissing("symbols"))?;
273 let symbols = symbols_array.as_array()
274 .and_then(|vs| vs.iter().map(|v| v.as_str().map(|s| s.to_owned())).collect())
275 .ok_or(Error::from(ErrorKind::InvalidSchema))
276 .chain_err(|| ErrorKind::FieldTypeMismatch("symbols", "array of strings"))?;
277
278 self.schemata.push(Schema::Enum(EnumSchema {
279 name: schema_name,
280 doc: doc,
281 symbols: symbols,
282 }));
283
284 Ok(SchemaRef::Indirect(schema_id))
285 }
286
287 fn create_array(&mut self,
288 namespace: Option<&str>,
289 obj: &serde_json::Map<String, serde_json::Value>)
290 -> Result<SchemaRef> {
291 let items = obj.get("items")
292 .ok_or(Error::from(ErrorKind::InvalidSchema))
293 .chain_err(|| ErrorKind::RequiredFieldMissing("items"))?;
294 let items_schema = self.create_schema_ref(namespace, items)?;
295
296 Ok(SchemaRef::Direct(Schema::Array(Box::new(items_schema))))
297 }
298
299 fn create_map(&mut self,
300 namespace: Option<&str>,
301 obj: &serde_json::Map<String, serde_json::Value>)
302 -> Result<SchemaRef> {
303 let values = obj.get("values")
304 .ok_or(Error::from(ErrorKind::InvalidSchema))
305 .chain_err(|| ErrorKind::RequiredFieldMissing("values"))?;
306 let values_schema = self.create_schema_ref(namespace, values)?;
307
308 Ok(SchemaRef::Direct(Schema::Map(Box::new(values_schema))))
309 }
310
311 fn create_fixed(&mut self,
312 namespace: Option<&str>,
313 obj: &serde_json::Map<String, serde_json::Value>)
314 -> Result<SchemaRef> {
315 let (_, schema_name) = full_name(namespace, obj)?;
316 let schema_id = self.alloc_schema_name(schema_name.clone())?;
317
318 let doc = obj.get("doc")
319 .map(|v| {
320 v.as_str()
321 .map(ToOwned::to_owned)
322 .map(Some)
323 .ok_or(Error::from(ErrorKind::InvalidSchema))
324 .chain_err(|| ErrorKind::FieldTypeMismatch("doc", "string"))
325 })
326 .unwrap_or(Ok(None))?;
327
328 let size = obj.get("size")
329 .and_then(serde_json::Value::as_i64)
330 .ok_or(Error::from(ErrorKind::InvalidSchema))
331 .chain_err(|| ErrorKind::RequiredFieldMissing("size"))?;
332
333 self.schemata.push(Schema::Fixed(FixedSchema {
334 name: schema_name,
335 doc: doc,
336 size: size as i32,
337 }));
338
339 Ok(SchemaRef::Indirect(schema_id))
340 }
341
342 fn alloc_schema_name(&mut self, name: String) -> Result<SchemaId> {
343 use std::collections::hash_map::Entry;
344
345 match self.schemata_by_name.entry(name) {
346 Entry::Occupied(e) => Err(Error::from(ErrorKind::DuplicateSchema(e.key().clone()))),
347 Entry::Vacant(e) => {
348 let schema_id = SchemaId(self.next_id);
349 self.next_id += 1;
350
351 e.insert(schema_id);
352
353 Ok(schema_id)
354 },
355 }
356 }
357
358 fn create_field(&mut self,
359 namespace: Option<&str>,
360 json: &serde_json::Value)
361 -> Result<(String, FieldSchema)> {
362 let name = json.get("name")
363 .and_then(serde_json::Value::as_str)
364 .ok_or(Error::from(ErrorKind::InvalidSchema))
365 .chain_err(|| ErrorKind::RequiredFieldMissing("name"))?;
366 let doc = json.get("doc")
367 .map(|v| {
368 v.as_str()
369 .map(ToOwned::to_owned)
370 .map(Some)
371 .ok_or(Error::from(ErrorKind::InvalidSchema))
372 .chain_err(|| ErrorKind::FieldTypeMismatch("doc", "string"))
373 })
374 .unwrap_or(Ok(None))?;
375 let field_type = json.get("type")
376 .ok_or(Error::from(ErrorKind::InvalidSchema))
377 .chain_err(|| ErrorKind::RequiredFieldMissing("name"))
378 .and_then(|t| self.create_schema_ref(namespace, t))?;
379
380 let schema = FieldSchema {
381 name: name.to_owned(),
382 doc: doc,
383 field_type: field_type,
384 default: json.get("default").cloned(),
385 };
386
387 Ok((name.to_owned(), schema))
388 }
389}
390
391impl<'a> Iterator for RecordFields<'a> {
392 type Item = &'a FieldSchema;
393
394 fn next(&mut self) -> Option<&'a FieldSchema> {
395 self.0.next()
396 }
397}
398
399impl<'a> ExactSizeIterator for RecordFields<'a> {
400 fn len(&self) -> usize {
401 self.0.len()
402 }
403}
404
405impl RecordSchema {
406 pub fn name(&self) -> &str {
407 &self.name
408 }
409
410 pub fn doc(&self) -> Option<&str> {
411 if let Some(ref doc) = self.doc {
412 Some(doc.as_str())
413 } else {
414 None
415 }
416 }
417
418 pub fn fields(&self) -> RecordFields {
419 RecordFields(self.fields.values())
420 }
421
422 pub fn field_by_name(&self, name: &str) -> Option<&FieldSchema> {
423 self.fields.get(name)
424 }
425}
426
427impl FieldSchema {
428 pub fn name(&self) -> &str {
429 &self.name
430 }
431
432 pub fn doc(&self) -> Option<&str> {
433 if let Some(ref doc) = self.doc {
434 Some(doc.as_str())
435 } else {
436 None
437 }
438 }
439
440 pub fn field_type(&self) -> &SchemaRef {
441 &self.field_type
442 }
443
444 pub fn default(&self) -> Option<&serde_json::Value> {
445 if let Some(ref default) = self.default {
446 Some(default)
447 } else {
448 None
449 }
450 }
451}
452
453impl EnumSchema {
454 pub fn name(&self) -> &str {
455 &self.name
456 }
457
458 pub fn doc(&self) -> Option<&str> {
459 if let Some(ref doc) = self.doc {
460 Some(doc.as_str())
461 } else {
462 None
463 }
464 }
465
466 pub fn symbols(&self) -> &[String] {
467 &self.symbols
468 }
469}
470
471impl FixedSchema {
472 pub fn name(&self) -> &str {
473 &self.name
474 }
475
476 pub fn doc(&self) -> Option<&str> {
477 if let Some(ref doc) = self.doc {
478 Some(doc.as_str())
479 } else {
480 None
481 }
482 }
483
484 pub fn size(&self) -> i32 {
485 self.size
486 }
487}
488
489fn full_name<'a>(namespace: Option<&'a str>,
490 obj: &'a serde_json::Map<String, serde_json::Value>)
491 -> Result<(Option<&'a str>, String)> {
492 let namespace = obj.get("namespace")
493 .map(|v| {
494 v.as_str()
495 .map(Some)
496 .ok_or(Error::from(ErrorKind::InvalidSchema))
497 .chain_err(|| ErrorKind::FieldTypeMismatch("namespace", "string"))
498 })
499 .unwrap_or(Ok(namespace))?;
500
501 let name = obj.get("name")
502 .and_then(serde_json::Value::as_str)
503 .ok_or(Error::from(ErrorKind::InvalidSchema))
504 .chain_err(|| ErrorKind::RequiredFieldMissing("name"))?;
505
506 if let Some(ns) = namespace {
507 Ok((Some(ns), format!("{}.{}", ns, name)))
508 } else {
509 Ok((namespace, name.to_owned()))
510 }
511}
512
513fn registered_schema(registry: &collections::HashMap<String, SchemaId>,
514 namespace: Option<&str>,
515 name: &str)
516 -> Result<SchemaRef> {
517 match registry.get(name) {
518 Some(id) => Ok(SchemaRef::Indirect(*id)),
519 None => {
520 match namespace.and_then(|ns| registry.get(&format!("{}.{}", ns, name))) {
521 Some(id) => Ok(SchemaRef::Indirect(*id)),
522 None => Err(ErrorKind::NoSuchType(name.to_owned()).into()),
523 }
524 },
525 }
526}
527
528fn primitive_schema(name: &str) -> Option<SchemaRef> {
529 match name {
530 "null" => Some(SchemaRef::Direct(Schema::Null)),
531 "boolean" => Some(SchemaRef::Direct(Schema::Boolean)),
532 "int" => Some(SchemaRef::Direct(Schema::Int)),
533 "long" => Some(SchemaRef::Direct(Schema::Long)),
534 "float" => Some(SchemaRef::Direct(Schema::Float)),
535 "double" => Some(SchemaRef::Direct(Schema::Double)),
536 "bytes" => Some(SchemaRef::Direct(Schema::Bytes)),
537 "string" => Some(SchemaRef::Direct(Schema::String)),
538 _ => None,
539 }
540}
541
542#[cfg(test)]
543mod test {
544
545 use super::*;
546 use serde_json;
547
548 #[test]
549 fn parse_record_schema() {
550 let schema = serde_json::from_str(r#"
551 {
552 "namespace": "example.avro",
553 "type": "record",
554 "name": "Record",
555 "fields": [
556 {"name": "null", "type": "null"},
557 {"name": "boolean", "type": "boolean"},
558 {"name": "int", "type": "int"},
559 {"name": "long", "type": "long"},
560 {"name": "float", "type": "float"},
561 {"name": "double", "type": "double"},
562 {"name": "bytes", "type": "bytes"},
563 {"name": "string", "type": "string"},
564 {
565 "name": "record",
566 "type": {
567 "type": "record",
568 "name": "SubRecord",
569 "fields": [
570 {"name": "null", "type": "null"},
571 {"name": "boolean", "type": "boolean"},
572 {"name": "int", "type": "int"},
573 {"name": "long", "type": "long"},
574 {"name": "float", "type": "float"},
575 {"name": "double", "type": "double"},
576 {"name": "bytes", "type": "bytes"},
577 {"name": "string", "type": "string"},
578 {
579 "name": "enum",
580 "type": {
581 "type": "enum",
582 "name": "SubEnum",
583 "symbols": ["A", "B"]
584 }
585 },
586 {
587 "name": "array",
588 "type": {
589 "type": "array",
590 "items": "string"
591 }
592 },
593 {
594 "name": "map",
595 "type": {
596 "type": "map",
597 "values": "string"
598 }
599 },
600 {
601 "name": "union",
602 "type": ["null", "string", "int"]
603 },
604 {
605 "name": "fixed",
606 "type": {
607 "namespace": "example.avro",
608 "type": "fixed",
609 "name": "SubId",
610 "size": 32
611 }
612 }
613 ]
614 }
615 },
616 {
617 "name": "enum",
618 "type": {
619 "type": "enum",
620 "name": "Enum",
621 "symbols": ["A", "B"]
622 }
623 },
624 {
625 "name": "array",
626 "type": {
627 "type": "array",
628 "items": "string"
629 }
630 },
631 {
632 "name": "map",
633 "type": {
634 "type": "map",
635 "values": "string"
636 }
637 },
638 {
639 "name": "union",
640 "type": ["null", "string", "int"]
641 },
642 {
643 "name": "fixed",
644 "type": {
645 "namespace": "example.avro",
646 "type": "fixed",
647 "name": "Id",
648 "size": 32
649 }
650 }
651 ]
652 }
653 "#)
654 .unwrap();
655 let (schema_registry, _) = SchemaRegistry::from_json(&schema).unwrap();
656
657 println!("{:?}", schema_registry);
658
659 match schema_registry.schema_by_name("example.avro.Record") {
660 Some(&Schema::Record(ref record)) => {
661 let fields = record.fields().collect::<Vec<_>>();
662 assert_eq!("null", fields[0].name());
663 assert_eq!("boolean", fields[1].name());
664 assert_eq!("int", fields[2].name());
665 assert_eq!("long", fields[3].name());
666 assert_eq!("float", fields[4].name());
667 assert_eq!("double", fields[5].name());
668 assert_eq!("bytes", fields[6].name());
669 assert_eq!("string", fields[7].name());
670 assert_eq!("record", fields[8].name());
671 assert_eq!("enum", fields[9].name());
672 assert_eq!("array", fields[10].name());
673 assert_eq!("map", fields[11].name());
674 assert_eq!("union", fields[12].name());
675 assert_eq!("fixed", fields[13].name());
676
677 },
679 _ => unreachable!(),
680 }
681 }
682
683 #[test]
684 fn parse_recursive_record_schema() {
685 let schema = serde_json::from_str(r#"
686 {
687 "namespace": "example.avro",
688 "type": "record",
689 "name": "User",
690 "fields": [
691 {"name": "parent", "type": "User"}
692 ]
693 }
694 "#)
695 .unwrap();
696 let (schema_registry, _) = SchemaRegistry::from_json(&schema).unwrap();
697
698 match schema_registry.schema_by_name("example.avro.User") {
699 Some(&Schema::Record(ref record)) => {
700 assert_eq!("example.avro.User", record.name());
701 match record.field_by_name("parent")
702 .unwrap()
703 .field_type()
704 .resolve(&schema_registry) {
705 &Schema::Record(ref record) => {
706 assert_eq!("example.avro.User", record.name());
707 },
708 _ => unreachable!(),
709 }
710 },
711 _ => unreachable!(),
712 }
713 }
714
715 #[test]
716 fn parse_recursive_qualified_record_schema() {
717 let schema = serde_json::from_str(r#"
718 {
719 "namespace": "example.avro",
720 "type": "record",
721 "name": "User",
722 "fields": [
723 {"name": "parent", "type": "example.avro.User"}
724 ]
725 }
726 "#)
727 .unwrap();
728 let (schema_registry, _) = SchemaRegistry::from_json(&schema).unwrap();
729
730 match schema_registry.schema_by_name("example.avro.User") {
731 Some(&Schema::Record(ref record)) => {
732 assert_eq!("example.avro.User", record.name());
733 match record.field_by_name("parent")
734 .unwrap()
735 .field_type()
736 .resolve(&schema_registry) {
737 &Schema::Record(ref record) => {
738 assert_eq!("example.avro.User", record.name());
739 },
740 _ => unreachable!(),
741 }
742 },
743 _ => unreachable!(),
744 }
745 }
746
747 #[test]
748 fn parse_enum_schema() {
749 let schema = serde_json::from_str(r#"
750 {
751 "namespace": "example.avro",
752 "type": "enum",
753 "name": "User",
754 "symbols": ["Adam", "Eve"]
755 }
756 "#)
757 .unwrap();
758 let (schema_registry, _) = SchemaRegistry::from_json(&schema).unwrap();
759
760 match schema_registry.schema_by_name("example.avro.User") {
761 Some(&Schema::Enum(ref enu)) => {
762 assert_eq!("example.avro.User", enu.name());
763 assert_eq!(&["Adam".to_owned(), "Eve".to_owned()], enu.symbols());
764 },
765 _ => unreachable!(),
766 }
767 }
768
769 #[test]
770 fn parse_fixed_schema() {
771 let schema = serde_json::from_str(r#"
772 {
773 "namespace": "example.avro",
774 "type": "fixed",
775 "name": "Id",
776 "size": 32
777 }
778 "#)
779 .unwrap();
780 let (schema_registry, _) = SchemaRegistry::from_json(&schema).unwrap();
781
782 match schema_registry.schema_by_name("example.avro.Id") {
783 Some(&Schema::Fixed(ref fixed)) => {
784 assert_eq!("example.avro.Id", fixed.name());
785 assert_eq!(32, fixed.size());
786 },
787 _ => unreachable!(),
788 }
789 }
790}