avro_rs/
encode.rs

1use crate::{
2    schema::Schema,
3    types::Value,
4    util::{zig_i32, zig_i64},
5};
6use std::convert::TryInto;
7
8/// Encode a `Value` into avro format.
9///
10/// **NOTE** This will not perform schema validation. The value is assumed to
11/// be valid with regards to the schema. Schema are needed only to guide the
12/// encoding for complex type values.
13pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
14    encode_ref(&value, schema, buffer)
15}
16
17fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
18    let bytes = s.as_ref();
19    encode(&Value::Long(bytes.len() as i64), &Schema::Long, buffer);
20    buffer.extend_from_slice(bytes);
21}
22
23fn encode_long(i: i64, buffer: &mut Vec<u8>) {
24    zig_i64(i, buffer)
25}
26
27fn encode_int(i: i32, buffer: &mut Vec<u8>) {
28    zig_i32(i, buffer)
29}
30
31/// Encode a `Value` into avro format.
32///
33/// **NOTE** This will not perform schema validation. The value is assumed to
34/// be valid with regards to the schema. Schema are needed only to guide the
35/// encoding for complex type values.
36pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
37    match value {
38        Value::Null => (),
39        Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
40        // Pattern | Pattern here to signify that these _must_ have the same encoding.
41        Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer),
42        Value::Long(i)
43        | Value::TimestampMillis(i)
44        | Value::TimestampMicros(i)
45        | Value::TimeMicros(i) => encode_long(*i, buffer),
46        Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
47        Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
48        Value::Decimal(decimal) => match schema {
49            Schema::Decimal { inner, .. } => match *inner.clone() {
50                Schema::Fixed { size, .. } => {
51                    let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap();
52                    let num_bytes = bytes.len();
53                    if num_bytes != size {
54                        panic!(
55                            "signed decimal bytes length {} not equal to fixed schema size {}",
56                            num_bytes, size
57                        );
58                    }
59                    encode(&Value::Fixed(size, bytes), inner, buffer)
60                }
61                Schema::Bytes => encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer),
62                _ => panic!("invalid inner type for decimal: {:?}", inner),
63            },
64            _ => panic!("invalid type for decimal: {:?}", schema),
65        },
66        &Value::Duration(duration) => {
67            let slice: [u8; 12] = duration.into();
68            buffer.extend_from_slice(&slice);
69        }
70        Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
71        Value::Bytes(bytes) => match *schema {
72            Schema::Bytes => encode_bytes(bytes, buffer),
73            Schema::Fixed { .. } => buffer.extend(bytes),
74            _ => (),
75        },
76        Value::String(s) => match *schema {
77            Schema::String => {
78                encode_bytes(s, buffer);
79            }
80            Schema::Enum { ref symbols, .. } => {
81                if let Some(index) = symbols.iter().position(|item| item == s) {
82                    encode_int(index as i32, buffer);
83                }
84            }
85            _ => (),
86        },
87        Value::Fixed(_, bytes) => buffer.extend(bytes),
88        Value::Enum(i, _) => encode_int(*i, buffer),
89        Value::Union(item) => {
90            if let Schema::Union(ref inner) = *schema {
91                // Find the schema that is matched here. Due to validation, this should always
92                // return a value.
93                let (idx, inner_schema) = inner
94                    .find_schema(item)
95                    .expect("Invalid Union validation occurred");
96                encode_long(idx as i64, buffer);
97                encode_ref(&*item, inner_schema, buffer);
98            }
99        }
100        Value::Array(items) => {
101            if let Schema::Array(ref inner) = *schema {
102                if !items.is_empty() {
103                    encode_long(items.len() as i64, buffer);
104                    for item in items.iter() {
105                        encode_ref(item, inner, buffer);
106                    }
107                }
108                buffer.push(0u8);
109            }
110        }
111        Value::Map(items) => {
112            if let Schema::Map(ref inner) = *schema {
113                if !items.is_empty() {
114                    encode_long(items.len() as i64, buffer);
115                    for (key, value) in items {
116                        encode_bytes(key, buffer);
117                        encode_ref(value, inner, buffer);
118                    }
119                }
120                buffer.push(0u8);
121            }
122        }
123        Value::Record(fields) => {
124            if let Schema::Record {
125                fields: ref schema_fields,
126                ..
127            } = *schema
128            {
129                for (i, &(_, ref value)) in fields.iter().enumerate() {
130                    encode_ref(value, &schema_fields[i].schema, buffer);
131                }
132            }
133        }
134    }
135}
136
137pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
138    let mut buffer = Vec::new();
139    encode(&value, schema, &mut buffer);
140    buffer
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use std::collections::HashMap;
147
148    #[test]
149    fn test_encode_empty_array() {
150        let mut buf = Vec::new();
151        let empty: Vec<Value> = Vec::new();
152        encode(
153            &Value::Array(empty),
154            &Schema::Array(Box::new(Schema::Int)),
155            &mut buf,
156        );
157        assert_eq!(vec![0u8], buf);
158    }
159
160    #[test]
161    fn test_encode_empty_map() {
162        let mut buf = Vec::new();
163        let empty: HashMap<String, Value> = HashMap::new();
164        encode(
165            &Value::Map(empty),
166            &Schema::Map(Box::new(Schema::Int)),
167            &mut buf,
168        );
169        assert_eq!(vec![0u8], buf);
170    }
171}