1use super::{flags, BinaryDataType, SchemaLayout, HEADER_SIZE};
7use alloc::rc::Rc;
8use alloc::vec::Vec;
9use cynos_core::{Row, Value};
10
11pub struct BinaryEncoder {
13 layout: SchemaLayout,
14 buffer: Vec<u8>,
16 var_buffer: Vec<u8>,
18 row_count: usize,
20 has_nulls: bool,
22}
23
24impl BinaryEncoder {
25 pub fn new(layout: SchemaLayout, estimated_rows: usize) -> Self {
27 let fixed_size = layout.calculate_fixed_size(estimated_rows);
28 let var_estimate = estimated_rows * 32;
30
31 Self {
32 layout,
33 buffer: Vec::with_capacity(fixed_size),
34 var_buffer: Vec::with_capacity(var_estimate),
35 row_count: 0,
36 has_nulls: false,
37 }
38 }
39
40 pub fn encode_rows(&mut self, rows: &[Rc<Row>]) {
42 if self.buffer.is_empty() {
44 self.buffer.resize(HEADER_SIZE, 0);
45 }
46
47 for row in rows {
48 self.encode_row(row);
49 self.row_count += 1;
50 }
51 }
52
53 #[inline(always)]
55 fn encode_row(&mut self, row: &Row) {
56 let null_mask_size = self.layout.null_mask_size();
57 let num_columns = self.layout.columns().len();
58
59 let null_mask_start = self.buffer.len();
61 self.buffer.resize(null_mask_start + null_mask_size, 0);
62
63 for col_idx in 0..num_columns {
65 let col_layout = &self.layout.columns()[col_idx];
66 let data_type = col_layout.data_type;
67 let fixed_size = col_layout.fixed_size;
68
69 let value = row.get(col_idx);
70
71 if matches!(value, Some(Value::Null) | None) {
73 let byte_idx = col_idx / 8;
75 let bit_idx = col_idx % 8;
76 self.buffer[null_mask_start + byte_idx] |= 1 << bit_idx;
77 self.has_nulls = true;
78
79 self.buffer.extend(core::iter::repeat(0).take(fixed_size));
81 continue;
82 }
83
84 let value = value.unwrap();
85 self.encode_value_fast(value, data_type);
86 }
87 }
88
89 #[inline(always)]
91 fn encode_value_fast(&mut self, value: &Value, data_type: BinaryDataType) {
92 match (value, data_type) {
93 (Value::Boolean(b), BinaryDataType::Boolean) => {
94 self.buffer.push(if *b { 1 } else { 0 });
95 }
96 (Value::Int32(i), BinaryDataType::Int32) => {
97 self.write_bytes_fast(&i.to_le_bytes());
98 }
99 (Value::Int64(i), BinaryDataType::Int64) => {
100 let f = *i as f64;
102 self.write_bytes_fast(&f.to_le_bytes());
103 }
104 (Value::Float64(f), BinaryDataType::Float64) => {
105 self.write_bytes_fast(&f.to_le_bytes());
106 }
107 (Value::DateTime(ts), BinaryDataType::DateTime) => {
108 let f = *ts as f64;
110 self.write_bytes_fast(&f.to_le_bytes());
111 }
112 (Value::String(s), BinaryDataType::String) => {
113 self.write_varlen_fast(s.as_bytes());
114 }
115 (Value::Bytes(b), BinaryDataType::Bytes) => {
116 self.write_varlen_fast(b);
117 }
118 (Value::Jsonb(j), BinaryDataType::Jsonb) => {
119 self.write_varlen_fast(&j.0);
121 }
122 _ => {
124 let size = data_type.fixed_size();
125 self.buffer.extend(core::iter::repeat(0).take(size));
126 }
127 }
128 }
129
130 #[inline(always)]
132 fn write_bytes_fast(&mut self, bytes: &[u8]) {
133 let len = bytes.len();
134 let old_len = self.buffer.len();
135 self.buffer.reserve(len);
136
137 unsafe {
139 let dst = self.buffer.as_mut_ptr().add(old_len);
140 core::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, len);
141 self.buffer.set_len(old_len + len);
142 }
143 }
144
145 #[inline(always)]
147 fn write_varlen_fast(&mut self, data: &[u8]) {
148 let offset = self.var_buffer.len() as u32;
149 let length = data.len() as u32;
150
151 self.write_bytes_fast(&offset.to_le_bytes());
153 self.write_bytes_fast(&length.to_le_bytes());
154
155 let old_len = self.var_buffer.len();
157 self.var_buffer.reserve(data.len());
158
159 unsafe {
161 let dst = self.var_buffer.as_mut_ptr().add(old_len);
162 core::ptr::copy_nonoverlapping(data.as_ptr(), dst, data.len());
163 self.var_buffer.set_len(old_len + data.len());
164 }
165 }
166
167 pub fn finish(mut self) -> Vec<u8> {
169 let var_offset = self.buffer.len() as u32;
171
172 let row_count = self.row_count as u32;
174 let row_stride = self.layout.row_stride() as u32;
175 let flags = if self.has_nulls { flags::HAS_NULLS } else { 0 };
176
177 self.buffer[0..4].copy_from_slice(&row_count.to_le_bytes());
179 self.buffer[4..8].copy_from_slice(&row_stride.to_le_bytes());
180 self.buffer[8..12].copy_from_slice(&var_offset.to_le_bytes());
181 self.buffer[12..16].copy_from_slice(&flags.to_le_bytes());
182
183 self.buffer.append(&mut self.var_buffer);
185
186 self.buffer
187 }
188
189 pub fn layout(&self) -> &SchemaLayout {
191 &self.layout
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use alloc::string::String;
199 use cynos_core::schema::{Column, Table};
200 use cynos_core::DataType;
201
202 fn create_test_schema() -> Table {
203 Table::new(
204 "test".to_string(),
205 vec![
206 Column::new("id".to_string(), DataType::Int64),
207 Column::new("name".to_string(), DataType::String).nullable(true),
208 Column::new("value".to_string(), DataType::Float64),
209 ],
210 vec![],
211 Default::default(),
212 false,
213 )
214 }
215
216 #[test]
217 fn test_schema_layout() {
218 let schema = create_test_schema();
219 let layout = SchemaLayout::from_schema(&schema);
220
221 assert_eq!(layout.columns().len(), 3);
222 assert_eq!(layout.null_mask_size(), 1); assert_eq!(layout.row_stride(), 25);
225 }
226
227 #[test]
228 fn test_encode_simple_row() {
229 let schema = create_test_schema();
230 let layout = SchemaLayout::from_schema(&schema);
231 let mut encoder = BinaryEncoder::new(layout, 1);
232
233 let row = Rc::new(Row::new(
234 1,
235 vec![
236 Value::Int64(42),
237 Value::String("hello".to_string()),
238 Value::Float64(3.14),
239 ],
240 ));
241
242 encoder.encode_rows(&[row]);
243 let buffer = encoder.finish();
244
245 let row_count = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]);
247 assert_eq!(row_count, 1);
248
249 let row_stride = u32::from_le_bytes([buffer[4], buffer[5], buffer[6], buffer[7]]);
250 assert_eq!(row_stride, 25);
251 }
252}