Skip to main content

tell_encoding/
batch.rs

1use crate::helpers::*;
2use crate::{API_KEY_LENGTH, BatchParams, DEFAULT_VERSION};
3
4/// Encode a FlatBuffer Batch message.
5///
6/// Layout:
7/// ```text
8/// [4 bytes: root offset] -> points to table
9/// [vtable]
10///   - vtable_size (u16)
11///   - table_size (u16)
12///   - field offsets (u16 each, 0 = not present)
13/// [table]
14///   - soffset to vtable (i32)
15///   - inline scalars and vector offsets
16/// [vectors]
17///   - length (u32) + data bytes
18/// ```
19///
20/// Batch fields:
21/// - field 0: api_key `[ubyte]` (required)
22/// - field 1: schema_type `u8`
23/// - field 2: version `u8`
24/// - field 3: batch_id `u64`
25/// - field 4: data `[ubyte]` (required)
26/// - field 5: source_ip `[ubyte]` (not used by SDKs)
27pub fn encode_batch(params: &BatchParams<'_>) -> Vec<u8> {
28    let mut buf = Vec::new();
29    encode_batch_into(&mut buf, params);
30    buf
31}
32
33/// Encode a Batch into a caller-owned buffer (avoids allocation when buffer is reused).
34pub fn encode_batch_into(buf: &mut Vec<u8>, params: &BatchParams<'_>) {
35    let has_batch_id = params.batch_id != 0;
36    let version = if params.version == 0 {
37        DEFAULT_VERSION
38    } else {
39        params.version
40    };
41
42    // VTable: size(u16) + table_size(u16) + 6 field slots (u16 each) = 16 bytes
43    let vtable_size: u16 = 4 + 6 * 2;
44
45    // Fixed table layout (32 bytes total):
46    // soffset (4) + api_key_off (4) + data_off (4) + source_ip_off (4) + batch_id (8) + schema_type (1) + version (1) + pad (2) = 28 after soffset
47    let table_size: u16 = 4 + 28;
48
49    let api_key_vec_size = 4 + API_KEY_LENGTH;
50    let data_vec_size = 4 + params.data.len();
51
52    let estimated =
53        4 + vtable_size as usize + table_size as usize + api_key_vec_size + data_vec_size + 16;
54    buf.reserve(estimated);
55
56    let base = buf.len();
57
58    // Root offset placeholder
59    buf.extend_from_slice(&[0u8; 4]);
60
61    // VTable
62    let vtable_start = buf.len();
63    write_u16(buf, vtable_size);
64    write_u16(buf, table_size);
65
66    // Field offsets
67    write_u16(buf, 4); // field 0: api_key at table+4
68    write_u16(buf, 24); // field 1: schema_type at table+24
69    write_u16(buf, 25); // field 2: version at table+25
70    write_u16(buf, if has_batch_id { 16 } else { 0 }); // field 3: batch_id at table+16
71    write_u16(buf, 8); // field 4: data at table+8
72    write_u16(buf, 0); // field 5: source_ip (not used)
73
74    // Table
75    let table_start = buf.len();
76    let soffset = (table_start - vtable_start) as i32;
77    write_i32(buf, soffset);
78
79    // api_key offset placeholder
80    let api_key_off_pos = buf.len();
81    write_u32(buf, 0);
82
83    // data offset placeholder
84    let data_off_pos = buf.len();
85    write_u32(buf, 0);
86
87    // source_ip offset placeholder (unused)
88    write_u32(buf, 0);
89
90    // batch_id (u64)
91    write_u64(buf, params.batch_id);
92
93    // schema_type (u8)
94    buf.push(params.schema_type.as_u8());
95
96    // version (u8)
97    buf.push(version);
98
99    // padding (2 bytes)
100    buf.extend_from_slice(&[0u8; 2]);
101
102    // Vectors
103    align4(buf);
104
105    // api_key vector
106    let api_key_vec_start = write_byte_vector(buf, params.api_key);
107
108    align4(buf);
109
110    // data vector
111    let data_vec_start = write_byte_vector(buf, params.data);
112
113    // Fill in offsets
114    buf[base..base + 4].copy_from_slice(&(table_start as u32).to_le_bytes());
115    patch_offset(buf, api_key_off_pos, api_key_vec_start);
116    patch_offset(buf, data_off_pos, data_vec_start);
117}