Skip to main content

avrow/schema/
mod.rs

1//! Contains routines for parsing and validating an Avro schema.
2//! Schemas in avro are written as JSON and can be provided as .avsc files
3//! to a Writer or a Reader.
4
5pub mod common;
6#[cfg(test)]
7mod tests;
8use crate::error::AvrowErr;
9pub use common::Order;
10mod canonical;
11pub mod parser;
12pub(crate) use parser::Registry;
13
14use crate::error::AvrowResult;
15use crate::value::Value;
16use canonical::normalize_schema;
17use canonical::CanonicalSchema;
18use common::{Field, Name};
19use indexmap::IndexMap;
20use serde_json::{self, Value as JsonValue};
21use std::fmt::Debug;
22use std::fs::OpenOptions;
23use std::path::Path;
24
25#[derive(Debug, Clone, PartialEq)]
26pub(crate) enum Variant {
27    Null,
28    Boolean,
29    Int,
30    Long,
31    Float,
32    Double,
33    Bytes,
34    Str,
35    Record {
36        name: Name,
37        aliases: Option<Vec<String>>,
38        fields: IndexMap<String, Field>,
39    },
40    Fixed {
41        name: Name,
42        size: usize,
43    },
44    Enum {
45        name: Name,
46        aliases: Option<Vec<String>>,
47        symbols: Vec<String>,
48    },
49    Map {
50        values: Box<Variant>,
51    },
52    Array {
53        items: Box<Variant>,
54    },
55    Union {
56        variants: Vec<Variant>,
57    },
58    Named(String),
59}
60
61/// Represents the avro schema used to write encoded avro data.
62#[derive(Debug)]
63pub struct Schema {
64    // TODO can remove this if not needed
65    inner: JsonValue,
66    // Schema context that has a lookup table to resolve named schema references
67    pub(crate) cxt: Registry,
68    // typed and stripped version of schema used internally.
69    pub(crate) variant: Variant,
70    // canonical form of schema. This is used for equality.
71    pub(crate) canonical: CanonicalSchema,
72}
73
74impl PartialEq for Schema {
75    fn eq(&self, other: &Self) -> bool {
76        self.canonical == other.canonical
77    }
78}
79
80impl std::fmt::Display for Schema {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        std::fmt::Display::fmt(&self.inner, f)
83    }
84}
85
86impl std::str::FromStr for Schema {
87    type Err = AvrowErr;
88    /// Parse an avro schema from a JSON string
89    /// One can use Rust's raw string syntax (r##""##) to pass schema.
90    fn from_str(schema: &str) -> Result<Self, Self::Err> {
91        let schema_json =
92            serde_json::from_str(schema).map_err(|e| AvrowErr::SchemaParseErr(e.into()))?;
93        Schema::parse_imp(schema_json)
94    }
95}
96
97impl Schema {
98    /// Parses an avro schema from a JSON schema in a file.
99    /// Alternatively, one can use the [`FromStr`](https://doc.rust-lang.org/std/str/trait.FromStr.html)
100    /// impl to create the Schema from a JSON string:
101    /// ```
102    /// use std::str::FromStr;
103    /// use avrow::Schema;
104    ///
105    /// let schema = Schema::from_str(r##""null""##).unwrap();
106    /// ```
107    pub fn from_path<P: AsRef<Path> + Debug>(path: P) -> AvrowResult<Self> {
108        let schema_file = OpenOptions::new()
109            .read(true)
110            .open(&path)
111            .map_err(AvrowErr::SchemaParseErr)?;
112        let value =
113            serde_json::from_reader(schema_file).map_err(|e| AvrowErr::SchemaParseErr(e.into()))?;
114        Schema::parse_imp(value)
115    }
116
117    fn parse_imp(schema_json: JsonValue) -> AvrowResult<Self> {
118        let mut parser = Registry::new();
119        let pcf = CanonicalSchema(normalize_schema(&schema_json)?);
120        // TODO see if we can use canonical form to parse variant
121        let variant = parser.parse_schema(&schema_json, None)?;
122        Ok(Schema {
123            inner: schema_json,
124            cxt: parser,
125            variant,
126            canonical: pcf,
127        })
128    }
129
130    pub(crate) fn as_bytes(&self) -> Vec<u8> {
131        format!("{}", self.inner).into_bytes()
132    }
133
134    pub(crate) fn variant(&self) -> &Variant {
135        &self.variant
136    }
137
138    #[inline(always)]
139    pub(crate) fn validate(&self, value: &Value) -> AvrowResult<()> {
140        self.variant.validate(value, &self.cxt)
141    }
142
143    /// Returns the canonical form of an Avro schema.
144    /// Example:
145    /// ```rust
146    /// use avrow::Schema;
147    /// use std::str::FromStr;
148    ///
149    /// let schema = Schema::from_str(r##"
150    ///     {
151    ///         "type": "record",
152    ///         "name": "LongList",
153    ///         "aliases": ["LinkedLongs"],
154    ///         "fields" : [
155    ///             {"name": "value", "type": "long"},
156    ///             {"name": "next", "type": ["null", "LongList"]
157    ///         }]
158    ///     }
159    /// "##).unwrap();
160    ///
161    /// let canonical = schema.canonical_form();
162    /// ```
163    pub fn canonical_form(&self) -> &CanonicalSchema {
164        &self.canonical
165    }
166}
167
168impl Variant {
169    pub fn validate(&self, value: &Value, cxt: &Registry) -> AvrowResult<()> {
170        let variant = self;
171        match (value, variant) {
172            (Value::Null, Variant::Null)
173            | (Value::Boolean(_), Variant::Boolean)
174            | (Value::Int(_), Variant::Int)
175            // long is promotable to float or double
176            | (Value::Long(_), Variant::Long)
177            | (Value::Long(_), Variant::Float)
178            | (Value::Long(_), Variant::Double)
179            // int is promotable to long, float or double
180            | (Value::Int(_), Variant::Long)
181            | (Value::Int(_), Variant::Float)
182            | (Value::Int(_), Variant::Double)
183            | (Value::Float(_), Variant::Float)
184            // float is promotable to double
185            | (Value::Float(_), Variant::Double)
186            | (Value::Double(_), Variant::Double)
187            | (Value::Str(_), Variant::Str)
188            // string is promotable to bytes
189            | (Value::Str(_), Variant::Bytes)
190            // bytes is promotable to string
191            | (Value::Bytes(_), Variant::Str)
192            | (Value::Bytes(_), Variant::Bytes) => {},
193            (Value::Fixed(v), Variant::Fixed { size, .. })
194            | (Value::Bytes(v), Variant::Fixed { size, .. }) => {
195                if v.len() != *size {
196                    return Err(AvrowErr::FixedValueLenMismatch {
197                        found: v.len(),
198                        expected: *size,
199                    });
200                }
201            }
202            (Value::Record(rec), Variant::Record { ref fields, .. }) => {
203                for (fname, fvalue) in &rec.fields {
204                    if let Some(ftype) = fields.get(fname) {
205                        ftype.ty.validate(&fvalue.value, cxt)?;
206                    } else {
207                        return Err(AvrowErr::RecordFieldMissing);
208                    }
209                }
210            }
211            (Value::Map(hmap), Variant::Map { values }) => {
212                return if let Some(v) = hmap.values().next() {
213                    values.validate(v, cxt)
214                } else {
215                    Err(AvrowErr::EmptyMap)
216                }
217            }
218            (Value::Enum(sym), Variant::Enum { symbols, .. }) if symbols.contains(sym) => {
219                return Ok(())
220            }
221            (Value::Array(item), Variant::Array { items }) => {
222                return if let Some(v) = item.first() {
223                    items.validate(v, cxt)
224                } else {
225                    Err(AvrowErr::EmptyArray)
226                }
227            }
228            (v, Variant::Named(name)) => {
229                if let Some(schema) = cxt.get(&name) {
230                    if schema.validate(v, cxt).is_ok() {
231                        return Ok(());
232                    }
233                }
234                return Err(AvrowErr::NamedSchemaNotFoundForValue)
235            }
236            // Value `a` can be any of the above schemas + any named schema in the schema registry
237            (a, Variant::Union { variants }) => {
238                for s in variants.iter() {
239                    if s.validate(a, cxt).is_ok() {
240                        return Ok(());
241                    }
242                }
243
244                return Err(AvrowErr::NotFoundInUnion)
245            }
246
247            (v, s) => {
248                return Err(AvrowErr::SchemaDataValidationFailed(
249                    format!("{:?}", v),
250                    format!("{:?}", s),
251                ))
252            }
253        }
254
255        Ok(())
256    }
257
258    fn get_named_mut(&mut self) -> Option<&mut Name> {
259        match self {
260            Variant::Record { name, .. }
261            | Variant::Fixed { name, .. }
262            | Variant::Enum { name, .. } => Some(name),
263            _ => None,
264        }
265    }
266}