1use crate::{
2 schema::Schema,
3 types::Value,
4 util::{zig_i32, zig_i64},
5};
6use std::convert::TryInto;
7
8pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
14 encode_ref(&value, schema, buffer)
15}
16
17fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
18 let bytes = s.as_ref();
19 encode(&Value::Long(bytes.len() as i64), &Schema::Long, buffer);
20 buffer.extend_from_slice(bytes);
21}
22
23fn encode_long(i: i64, buffer: &mut Vec<u8>) {
24 zig_i64(i, buffer)
25}
26
27fn encode_int(i: i32, buffer: &mut Vec<u8>) {
28 zig_i32(i, buffer)
29}
30
31pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
37 match value {
38 Value::Null => (),
39 Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
40 Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer),
42 Value::Long(i)
43 | Value::TimestampMillis(i)
44 | Value::TimestampMicros(i)
45 | Value::TimeMicros(i) => encode_long(*i, buffer),
46 Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
47 Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
48 Value::Decimal(decimal) => match schema {
49 Schema::Decimal { inner, .. } => match *inner.clone() {
50 Schema::Fixed { size, .. } => {
51 let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap();
52 let num_bytes = bytes.len();
53 if num_bytes != size {
54 panic!(
55 "signed decimal bytes length {} not equal to fixed schema size {}",
56 num_bytes, size
57 );
58 }
59 encode(&Value::Fixed(size, bytes), inner, buffer)
60 }
61 Schema::Bytes => encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer),
62 _ => panic!("invalid inner type for decimal: {:?}", inner),
63 },
64 _ => panic!("invalid type for decimal: {:?}", schema),
65 },
66 &Value::Duration(duration) => {
67 let slice: [u8; 12] = duration.into();
68 buffer.extend_from_slice(&slice);
69 }
70 Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
71 Value::Bytes(bytes) => match *schema {
72 Schema::Bytes => encode_bytes(bytes, buffer),
73 Schema::Fixed { .. } => buffer.extend(bytes),
74 _ => (),
75 },
76 Value::String(s) => match *schema {
77 Schema::String => {
78 encode_bytes(s, buffer);
79 }
80 Schema::Enum { ref symbols, .. } => {
81 if let Some(index) = symbols.iter().position(|item| item == s) {
82 encode_int(index as i32, buffer);
83 }
84 }
85 _ => (),
86 },
87 Value::Fixed(_, bytes) => buffer.extend(bytes),
88 Value::Enum(i, _) => encode_int(*i, buffer),
89 Value::Union(item) => {
90 if let Schema::Union(ref inner) = *schema {
91 let (idx, inner_schema) = inner
94 .find_schema(item)
95 .expect("Invalid Union validation occurred");
96 encode_long(idx as i64, buffer);
97 encode_ref(&*item, inner_schema, buffer);
98 }
99 }
100 Value::Array(items) => {
101 if let Schema::Array(ref inner) = *schema {
102 if !items.is_empty() {
103 encode_long(items.len() as i64, buffer);
104 for item in items.iter() {
105 encode_ref(item, inner, buffer);
106 }
107 }
108 buffer.push(0u8);
109 }
110 }
111 Value::Map(items) => {
112 if let Schema::Map(ref inner) = *schema {
113 if !items.is_empty() {
114 encode_long(items.len() as i64, buffer);
115 for (key, value) in items {
116 encode_bytes(key, buffer);
117 encode_ref(value, inner, buffer);
118 }
119 }
120 buffer.push(0u8);
121 }
122 }
123 Value::Record(fields) => {
124 if let Schema::Record {
125 fields: ref schema_fields,
126 ..
127 } = *schema
128 {
129 for (i, &(_, ref value)) in fields.iter().enumerate() {
130 encode_ref(value, &schema_fields[i].schema, buffer);
131 }
132 }
133 }
134 }
135}
136
137pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
138 let mut buffer = Vec::new();
139 encode(&value, schema, &mut buffer);
140 buffer
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use std::collections::HashMap;
147
148 #[test]
149 fn test_encode_empty_array() {
150 let mut buf = Vec::new();
151 let empty: Vec<Value> = Vec::new();
152 encode(
153 &Value::Array(empty),
154 &Schema::Array(Box::new(Schema::Int)),
155 &mut buf,
156 );
157 assert_eq!(vec![0u8], buf);
158 }
159
160 #[test]
161 fn test_encode_empty_map() {
162 let mut buf = Vec::new();
163 let empty: HashMap<String, Value> = HashMap::new();
164 encode(
165 &Value::Map(empty),
166 &Schema::Map(Box::new(Schema::Int)),
167 &mut buf,
168 );
169 assert_eq!(vec![0u8], buf);
170 }
171}