Skip to main content

cynos_binary/
encoder.rs

1//! Binary encoder for high-performance row serialization.
2//!
3//! Encodes rows into a compact binary format that can be read directly
4//! from WASM linear memory using JS DataView.
5
6use super::{flags, BinaryDataType, SchemaLayout, HEADER_SIZE};
7use alloc::rc::Rc;
8use alloc::vec::Vec;
9use cynos_core::{Row, Value};
10
11/// High-performance binary encoder
12pub struct BinaryEncoder {
13    layout: SchemaLayout,
14    /// Fixed section buffer (header + rows)
15    buffer: Vec<u8>,
16    /// Variable-length data buffer
17    var_buffer: Vec<u8>,
18    /// Number of rows encoded
19    row_count: usize,
20    /// Whether any NULL values were encountered
21    has_nulls: bool,
22}
23
24impl BinaryEncoder {
25    /// Create a new encoder with pre-allocated buffers
26    pub fn new(layout: SchemaLayout, estimated_rows: usize) -> Self {
27        let fixed_size = layout.calculate_fixed_size(estimated_rows);
28        // Estimate variable data: ~32 bytes per row average
29        let var_estimate = estimated_rows * 32;
30
31        Self {
32            layout,
33            buffer: Vec::with_capacity(fixed_size),
34            var_buffer: Vec::with_capacity(var_estimate),
35            row_count: 0,
36            has_nulls: false,
37        }
38    }
39
40    /// Encode a batch of rows
41    pub fn encode_rows(&mut self, rows: &[Rc<Row>]) {
42        // Reserve space for header (will be written at the end)
43        if self.buffer.is_empty() {
44            self.buffer.resize(HEADER_SIZE, 0);
45        }
46
47        for row in rows {
48            self.encode_row(row);
49            self.row_count += 1;
50        }
51    }
52
53    /// Encode a single row
54    #[inline(always)]
55    fn encode_row(&mut self, row: &Row) {
56        let null_mask_size = self.layout.null_mask_size();
57        let num_columns = self.layout.columns().len();
58
59        // Reserve space for null_mask
60        let null_mask_start = self.buffer.len();
61        self.buffer.resize(null_mask_start + null_mask_size, 0);
62
63        // Encode each column value
64        for col_idx in 0..num_columns {
65            let col_layout = &self.layout.columns()[col_idx];
66            let data_type = col_layout.data_type;
67            let fixed_size = col_layout.fixed_size;
68
69            let value = row.get(col_idx);
70
71            // Handle NULL
72            if matches!(value, Some(Value::Null) | None) {
73                // Set null bit
74                let byte_idx = col_idx / 8;
75                let bit_idx = col_idx % 8;
76                self.buffer[null_mask_start + byte_idx] |= 1 << bit_idx;
77                self.has_nulls = true;
78
79                // Write zero bytes for the column
80                self.buffer.extend(core::iter::repeat(0).take(fixed_size));
81                continue;
82            }
83
84            let value = value.unwrap();
85            self.encode_value_fast(value, data_type);
86        }
87    }
88
89    /// Encode a single value - optimized with direct memory writes
90    #[inline(always)]
91    fn encode_value_fast(&mut self, value: &Value, data_type: BinaryDataType) {
92        match (value, data_type) {
93            (Value::Boolean(b), BinaryDataType::Boolean) => {
94                self.buffer.push(if *b { 1 } else { 0 });
95            }
96            (Value::Int32(i), BinaryDataType::Int32) => {
97                self.write_bytes_fast(&i.to_le_bytes());
98            }
99            (Value::Int64(i), BinaryDataType::Int64) => {
100                // Store as f64 for JS Number compatibility
101                let f = *i as f64;
102                self.write_bytes_fast(&f.to_le_bytes());
103            }
104            (Value::Float64(f), BinaryDataType::Float64) => {
105                self.write_bytes_fast(&f.to_le_bytes());
106            }
107            (Value::DateTime(ts), BinaryDataType::DateTime) => {
108                // Store as f64 (milliseconds)
109                let f = *ts as f64;
110                self.write_bytes_fast(&f.to_le_bytes());
111            }
112            (Value::String(s), BinaryDataType::String) => {
113                self.write_varlen_fast(s.as_bytes());
114            }
115            (Value::Bytes(b), BinaryDataType::Bytes) => {
116                self.write_varlen_fast(b);
117            }
118            (Value::Jsonb(j), BinaryDataType::Jsonb) => {
119                // JsonbValue stores JSON as bytes already
120                self.write_varlen_fast(&j.0);
121            }
122            // Type mismatch - write zeros
123            _ => {
124                let size = data_type.fixed_size();
125                self.buffer.extend(core::iter::repeat(0).take(size));
126            }
127        }
128    }
129
130    /// Write bytes directly using copy_nonoverlapping for maximum performance
131    #[inline(always)]
132    fn write_bytes_fast(&mut self, bytes: &[u8]) {
133        let len = bytes.len();
134        let old_len = self.buffer.len();
135        self.buffer.reserve(len);
136
137        // SAFETY: We just reserved enough space, and we're writing valid bytes
138        unsafe {
139            let dst = self.buffer.as_mut_ptr().add(old_len);
140            core::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, len);
141            self.buffer.set_len(old_len + len);
142        }
143    }
144
145    /// Write variable-length data and store (offset, length) in fixed section
146    #[inline(always)]
147    fn write_varlen_fast(&mut self, data: &[u8]) {
148        let offset = self.var_buffer.len() as u32;
149        let length = data.len() as u32;
150
151        // Write offset and length to fixed section using fast path
152        self.write_bytes_fast(&offset.to_le_bytes());
153        self.write_bytes_fast(&length.to_le_bytes());
154
155        // Append data to variable buffer using fast path
156        let old_len = self.var_buffer.len();
157        self.var_buffer.reserve(data.len());
158
159        // SAFETY: We just reserved enough space
160        unsafe {
161            let dst = self.var_buffer.as_mut_ptr().add(old_len);
162            core::ptr::copy_nonoverlapping(data.as_ptr(), dst, data.len());
163            self.var_buffer.set_len(old_len + data.len());
164        }
165    }
166
167    /// Finalize encoding and return the complete buffer
168    pub fn finish(mut self) -> Vec<u8> {
169        // Calculate var_offset (where variable section starts)
170        let var_offset = self.buffer.len() as u32;
171
172        // Write header
173        let row_count = self.row_count as u32;
174        let row_stride = self.layout.row_stride() as u32;
175        let flags = if self.has_nulls { flags::HAS_NULLS } else { 0 };
176
177        // Header: row_count (4) + row_stride (4) + var_offset (4) + flags (4)
178        self.buffer[0..4].copy_from_slice(&row_count.to_le_bytes());
179        self.buffer[4..8].copy_from_slice(&row_stride.to_le_bytes());
180        self.buffer[8..12].copy_from_slice(&var_offset.to_le_bytes());
181        self.buffer[12..16].copy_from_slice(&flags.to_le_bytes());
182
183        // Append variable section
184        self.buffer.append(&mut self.var_buffer);
185
186        self.buffer
187    }
188
189    /// Get the schema layout
190    pub fn layout(&self) -> &SchemaLayout {
191        &self.layout
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use alloc::string::String;
199    use cynos_core::schema::{Column, Table};
200    use cynos_core::DataType;
201
202    fn create_test_schema() -> Table {
203        Table::new(
204            "test".to_string(),
205            vec![
206                Column::new("id".to_string(), DataType::Int64),
207                Column::new("name".to_string(), DataType::String).nullable(true),
208                Column::new("value".to_string(), DataType::Float64),
209            ],
210            vec![],
211            Default::default(),
212            false,
213        )
214    }
215
216    #[test]
217    fn test_schema_layout() {
218        let schema = create_test_schema();
219        let layout = SchemaLayout::from_schema(&schema);
220
221        assert_eq!(layout.columns().len(), 3);
222        assert_eq!(layout.null_mask_size(), 1); // ceil(3/8) = 1
223        // row_stride = null_mask(1) + id(8) + name(8) + value(8) = 25
224        assert_eq!(layout.row_stride(), 25);
225    }
226
227    #[test]
228    fn test_encode_simple_row() {
229        let schema = create_test_schema();
230        let layout = SchemaLayout::from_schema(&schema);
231        let mut encoder = BinaryEncoder::new(layout, 1);
232
233        let row = Rc::new(Row::new(
234            1,
235            vec![
236                Value::Int64(42),
237                Value::String("hello".to_string()),
238                Value::Float64(3.14),
239            ],
240        ));
241
242        encoder.encode_rows(&[row]);
243        let buffer = encoder.finish();
244
245        // Verify header
246        let row_count = u32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]);
247        assert_eq!(row_count, 1);
248
249        let row_stride = u32::from_le_bytes([buffer[4], buffer[5], buffer[6], buffer[7]]);
250        assert_eq!(row_stride, 25);
251    }
252}