mz_avro/
encode.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24use crate::schema::{Schema, SchemaNode, SchemaPiece};
25use crate::types::AvroMap;
26use crate::types::{DecimalValue, Value};
27use crate::util::{zig_i32, zig_i64};
28
29/// Encode a `Value` into avro format.
30///
31/// **NOTE** This will not perform schema validation. The value is assumed to
32/// be valid with regards to the schema. Schema are needed only to guide the
33/// encoding for complex type values.
34pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
35    encode_ref(&value, schema.top_node(), buffer)
36}
37
38fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
39    let bytes = s.as_ref();
40    encode(
41        &Value::Long(bytes.len() as i64),
42        &Schema {
43            named: vec![],
44            indices: Default::default(),
45            top: SchemaPiece::Long.into(),
46        },
47        buffer,
48    );
49    buffer.extend_from_slice(bytes);
50}
51
52fn encode_long(i: i64, buffer: &mut Vec<u8>) {
53    zig_i64(i, buffer)
54}
55
56fn encode_int(i: i32, buffer: &mut Vec<u8>) {
57    zig_i32(i, buffer)
58}
59
60/// Encode a `Value` into avro format.
61///
62/// **NOTE** This will not perform schema validation. The value is assumed to
63/// be valid with regards to the schema. Schema are needed only to guide the
64/// encoding for complex type values.
65pub fn encode_ref(value: &Value, schema: SchemaNode, buffer: &mut Vec<u8>) {
66    match value {
67        Value::Null => (),
68        Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
69        Value::Int(i) => encode_int(*i, buffer),
70        Value::Long(i) => encode_long(*i, buffer),
71        Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
72        Value::Date(d) => {
73            let span = (*d) - chrono::NaiveDate::from_ymd(1970, 1, 1);
74            encode_int(
75                span.num_days()
76                    .try_into()
77                    .expect("Num days is too large to encode as i32"),
78                buffer,
79            )
80        }
81        Value::Timestamp(d) => {
82            let mult = match schema.inner {
83                SchemaPiece::TimestampMilli => 1_000,
84                SchemaPiece::TimestampMicro => 1_000_000,
85                other => panic!("Invalid schema for timestamp: {:?}", other),
86            };
87            let ts_seconds = d
88                .timestamp()
89                .checked_mul(mult)
90                .expect("All chrono dates can be converted to timestamps");
91            let sub_part: i64 = if mult == 1_000 {
92                d.timestamp_subsec_millis().into()
93            } else {
94                d.timestamp_subsec_micros().into()
95            };
96            let ts = if ts_seconds >= 0 {
97                ts_seconds + sub_part
98            } else {
99                ts_seconds - sub_part
100            };
101            encode_long(ts, buffer)
102        }
103        Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
104        Value::Decimal(DecimalValue { unscaled, .. }) => match schema.name {
105            None => encode_bytes(unscaled, buffer),
106            Some(_) => buffer.extend(unscaled),
107        },
108        Value::Bytes(bytes) => encode_bytes(bytes, buffer),
109        Value::String(s) => match schema.inner {
110            SchemaPiece::String => {
111                encode_bytes(s, buffer);
112            }
113            SchemaPiece::Enum { symbols, .. } => {
114                if let Some(index) = symbols.iter().position(|item| item == s) {
115                    encode_int(index as i32, buffer);
116                }
117            }
118            _ => (),
119        },
120        Value::Fixed(_, bytes) => buffer.extend(bytes),
121        Value::Enum(i, _) => encode_int(*i as i32, buffer),
122        Value::Union { index, inner, .. } => {
123            if let SchemaPiece::Union(schema_inner) = schema.inner {
124                let schema_inner = &schema_inner.variants()[*index];
125                encode_long(*index as i64, buffer);
126                encode_ref(&*inner, schema.step(schema_inner), buffer);
127            }
128        }
129        Value::Array(items) => {
130            if let SchemaPiece::Array(inner) = schema.inner {
131                if !items.is_empty() {
132                    encode_long(items.len() as i64, buffer);
133                    for item in items.iter() {
134                        encode_ref(item, schema.step(&**inner), buffer);
135                    }
136                }
137                buffer.push(0u8);
138            }
139        }
140        Value::Map(AvroMap(items)) => {
141            if let SchemaPiece::Map(inner) = schema.inner {
142                if !items.is_empty() {
143                    encode_long(items.len() as i64, buffer);
144                    for (key, value) in items {
145                        encode_bytes(key, buffer);
146                        encode_ref(value, schema.step(&**inner), buffer);
147                    }
148                }
149                buffer.push(0u8);
150            }
151        }
152        Value::Record(fields) => {
153            if let SchemaPiece::Record {
154                fields: inner_fields,
155                ..
156            } = schema.inner
157            {
158                for (i, &(_, ref value)) in fields.iter().enumerate() {
159                    encode_ref(value, schema.step(&inner_fields[i].schema), buffer);
160                }
161            }
162        }
163        Value::Json(j) => {
164            encode_bytes(&j.to_string(), buffer);
165        }
166        Value::Uuid(u) => {
167            let u_str = u.to_string();
168            encode_bytes(&u_str, buffer);
169        }
170    }
171}
172
173pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
174    let mut buffer = Vec::new();
175    encode(&value, schema, &mut buffer);
176    buffer
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use std::collections::HashMap;
183
184    #[test]
185    fn test_encode_empty_array() {
186        let mut buf = Vec::new();
187        let empty: Vec<Value> = Vec::new();
188        encode(
189            &Value::Array(empty),
190            &r#"{"type": "array", "items": "int"}"#.parse().unwrap(),
191            &mut buf,
192        );
193        assert_eq!(vec![0u8], buf);
194    }
195
196    #[test]
197    fn test_encode_empty_map() {
198        let mut buf = Vec::new();
199        let empty: HashMap<String, Value> = HashMap::new();
200        encode(
201            &Value::Map(AvroMap(empty)),
202            &r#"{"type": "map", "values": "int"}"#.parse().unwrap(),
203            &mut buf,
204        );
205        assert_eq!(vec![0u8], buf);
206    }
207}