1use crate::schema::{Schema, SchemaNode, SchemaPiece};
25use crate::types::AvroMap;
26use crate::types::{DecimalValue, Value};
27use crate::util::{zig_i32, zig_i64};
28
29pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
35 encode_ref(&value, schema.top_node(), buffer)
36}
37
38fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
39 let bytes = s.as_ref();
40 encode(
41 &Value::Long(bytes.len() as i64),
42 &Schema {
43 named: vec![],
44 indices: Default::default(),
45 top: SchemaPiece::Long.into(),
46 },
47 buffer,
48 );
49 buffer.extend_from_slice(bytes);
50}
51
52fn encode_long(i: i64, buffer: &mut Vec<u8>) {
53 zig_i64(i, buffer)
54}
55
56fn encode_int(i: i32, buffer: &mut Vec<u8>) {
57 zig_i32(i, buffer)
58}
59
60pub fn encode_ref(value: &Value, schema: SchemaNode, buffer: &mut Vec<u8>) {
66 match value {
67 Value::Null => (),
68 Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
69 Value::Int(i) => encode_int(*i, buffer),
70 Value::Long(i) => encode_long(*i, buffer),
71 Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
72 Value::Date(d) => {
73 let span = (*d) - chrono::NaiveDate::from_ymd(1970, 1, 1);
74 encode_int(
75 span.num_days()
76 .try_into()
77 .expect("Num days is too large to encode as i32"),
78 buffer,
79 )
80 }
81 Value::Timestamp(d) => {
82 let mult = match schema.inner {
83 SchemaPiece::TimestampMilli => 1_000,
84 SchemaPiece::TimestampMicro => 1_000_000,
85 other => panic!("Invalid schema for timestamp: {:?}", other),
86 };
87 let ts_seconds = d
88 .timestamp()
89 .checked_mul(mult)
90 .expect("All chrono dates can be converted to timestamps");
91 let sub_part: i64 = if mult == 1_000 {
92 d.timestamp_subsec_millis().into()
93 } else {
94 d.timestamp_subsec_micros().into()
95 };
96 let ts = if ts_seconds >= 0 {
97 ts_seconds + sub_part
98 } else {
99 ts_seconds - sub_part
100 };
101 encode_long(ts, buffer)
102 }
103 Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
104 Value::Decimal(DecimalValue { unscaled, .. }) => match schema.name {
105 None => encode_bytes(unscaled, buffer),
106 Some(_) => buffer.extend(unscaled),
107 },
108 Value::Bytes(bytes) => encode_bytes(bytes, buffer),
109 Value::String(s) => match schema.inner {
110 SchemaPiece::String => {
111 encode_bytes(s, buffer);
112 }
113 SchemaPiece::Enum { symbols, .. } => {
114 if let Some(index) = symbols.iter().position(|item| item == s) {
115 encode_int(index as i32, buffer);
116 }
117 }
118 _ => (),
119 },
120 Value::Fixed(_, bytes) => buffer.extend(bytes),
121 Value::Enum(i, _) => encode_int(*i as i32, buffer),
122 Value::Union { index, inner, .. } => {
123 if let SchemaPiece::Union(schema_inner) = schema.inner {
124 let schema_inner = &schema_inner.variants()[*index];
125 encode_long(*index as i64, buffer);
126 encode_ref(&*inner, schema.step(schema_inner), buffer);
127 }
128 }
129 Value::Array(items) => {
130 if let SchemaPiece::Array(inner) = schema.inner {
131 if !items.is_empty() {
132 encode_long(items.len() as i64, buffer);
133 for item in items.iter() {
134 encode_ref(item, schema.step(&**inner), buffer);
135 }
136 }
137 buffer.push(0u8);
138 }
139 }
140 Value::Map(AvroMap(items)) => {
141 if let SchemaPiece::Map(inner) = schema.inner {
142 if !items.is_empty() {
143 encode_long(items.len() as i64, buffer);
144 for (key, value) in items {
145 encode_bytes(key, buffer);
146 encode_ref(value, schema.step(&**inner), buffer);
147 }
148 }
149 buffer.push(0u8);
150 }
151 }
152 Value::Record(fields) => {
153 if let SchemaPiece::Record {
154 fields: inner_fields,
155 ..
156 } = schema.inner
157 {
158 for (i, &(_, ref value)) in fields.iter().enumerate() {
159 encode_ref(value, schema.step(&inner_fields[i].schema), buffer);
160 }
161 }
162 }
163 Value::Json(j) => {
164 encode_bytes(&j.to_string(), buffer);
165 }
166 Value::Uuid(u) => {
167 let u_str = u.to_string();
168 encode_bytes(&u_str, buffer);
169 }
170 }
171}
172
173pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
174 let mut buffer = Vec::new();
175 encode(&value, schema, &mut buffer);
176 buffer
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use std::collections::HashMap;
183
184 #[test]
185 fn test_encode_empty_array() {
186 let mut buf = Vec::new();
187 let empty: Vec<Value> = Vec::new();
188 encode(
189 &Value::Array(empty),
190 &r#"{"type": "array", "items": "int"}"#.parse().unwrap(),
191 &mut buf,
192 );
193 assert_eq!(vec![0u8], buf);
194 }
195
196 #[test]
197 fn test_encode_empty_map() {
198 let mut buf = Vec::new();
199 let empty: HashMap<String, Value> = HashMap::new();
200 encode(
201 &Value::Map(AvroMap(empty)),
202 &r#"{"type": "map", "values": "int"}"#.parse().unwrap(),
203 &mut buf,
204 );
205 assert_eq!(vec![0u8], buf);
206 }
207}