Skip to main content

tell_encoding/
batch.rs

1use crate::helpers::*;
2use crate::{BatchParams, API_KEY_LENGTH, DEFAULT_VERSION};
3
4/// Encode a FlatBuffer Batch message.
5///
6/// Layout:
7/// ```text
8/// [4 bytes: root offset] -> points to table
9/// [vtable]
10///   - vtable_size (u16)
11///   - table_size (u16)
12///   - field offsets (u16 each, 0 = not present)
13/// [table]
14///   - soffset to vtable (i32)
15///   - inline scalars and vector offsets
16/// [vectors]
17///   - length (u32) + data bytes
18/// ```
19///
20/// Batch fields:
21/// - field 0: api_key `[ubyte]` (required)
22/// - field 1: schema_type `u8`
23/// - field 2: version `u8`
24/// - field 3: batch_id `u64`
25/// - field 4: data `[ubyte]` (required)
26/// - field 5: source_ip `[ubyte]` (not used by SDKs)
27pub fn encode_batch(params: &BatchParams<'_>) -> Vec<u8> {
28    let mut buf = Vec::new();
29    encode_batch_into(&mut buf, params);
30    buf
31}
32
33/// Encode a Batch into a caller-owned buffer (avoids allocation when buffer is reused).
34pub fn encode_batch_into(buf: &mut Vec<u8>, params: &BatchParams<'_>) {
35    let has_batch_id = params.batch_id != 0;
36    let version = if params.version == 0 { DEFAULT_VERSION } else { params.version };
37
38    // VTable: size(u16) + table_size(u16) + 6 field slots (u16 each) = 16 bytes
39    let vtable_size: u16 = 4 + 6 * 2;
40
41    // Fixed table layout (32 bytes total):
42    // soffset (4) + api_key_off (4) + data_off (4) + source_ip_off (4) + batch_id (8) + schema_type (1) + version (1) + pad (2) = 28 after soffset
43    let table_size: u16 = 4 + 28;
44
45    let api_key_vec_size = 4 + API_KEY_LENGTH;
46    let data_vec_size = 4 + params.data.len();
47
48    let estimated = 4 + vtable_size as usize + table_size as usize + api_key_vec_size + data_vec_size + 16;
49    buf.reserve(estimated);
50
51    let base = buf.len();
52
53    // Root offset placeholder
54    buf.extend_from_slice(&[0u8; 4]);
55
56    // VTable
57    let vtable_start = buf.len();
58    write_u16(buf, vtable_size);
59    write_u16(buf, table_size);
60
61    // Field offsets
62    write_u16(buf, 4);                                              // field 0: api_key at table+4
63    write_u16(buf, 24);                                             // field 1: schema_type at table+24
64    write_u16(buf, 25);                                             // field 2: version at table+25
65    write_u16(buf, if has_batch_id { 16 } else { 0 });             // field 3: batch_id at table+16
66    write_u16(buf, 8);                                              // field 4: data at table+8
67    write_u16(buf, 0);                                              // field 5: source_ip (not used)
68
69    // Table
70    let table_start = buf.len();
71    let soffset = (table_start - vtable_start) as i32;
72    write_i32(buf, soffset);
73
74    // api_key offset placeholder
75    let api_key_off_pos = buf.len();
76    write_u32(buf, 0);
77
78    // data offset placeholder
79    let data_off_pos = buf.len();
80    write_u32(buf, 0);
81
82    // source_ip offset placeholder (unused)
83    write_u32(buf, 0);
84
85    // batch_id (u64)
86    write_u64(buf, params.batch_id);
87
88    // schema_type (u8)
89    buf.push(params.schema_type.as_u8());
90
91    // version (u8)
92    buf.push(version);
93
94    // padding (2 bytes)
95    buf.extend_from_slice(&[0u8; 2]);
96
97    // Vectors
98    align4(buf);
99
100    // api_key vector
101    let api_key_vec_start = write_byte_vector(buf, params.api_key);
102
103    align4(buf);
104
105    // data vector
106    let data_vec_start = write_byte_vector(buf, params.data);
107
108    // Fill in offsets
109    buf[base..base + 4].copy_from_slice(&(table_start as u32).to_le_bytes());
110    patch_offset(buf, api_key_off_pos, api_key_vec_start);
111    patch_offset(buf, data_off_pos, data_vec_start);
112}