Skip to main content

tell_encoding/
metric.rs

1use crate::MetricEntryParams;
2use crate::helpers::*;
3
4/// Encode a single MetricEntry FlatBuffer.
5///
6/// MetricEntry table fields (by ID):
7/// - field 0:  metric_type `u8`
8/// - field 1:  timestamp `u64`
9/// - field 2:  name `string` (required)
10/// - field 3:  value `double`
11/// - field 4:  source `string`
12/// - field 5:  service `string`
13/// - field 6:  labels `[Label]` (vector of tables)
14/// - field 7:  temporality `u8`
15/// - field 8:  histogram (not yet supported)
16/// - field 9:  session_id `[ubyte]`
17/// - field 10: int_labels (not yet supported)
18///
19/// Table layout (after soffset):
20/// +4:  name offset (u32)
21/// +8:  source offset (u32)
22/// +12: service offset (u32)
23/// +16: labels offset (u32)
24/// +20: int_labels offset (u32) — always 0
25/// +24: histogram offset (u32) — always 0
26/// +28: session_id offset (u32)
27/// +32: timestamp (u64)
28/// +40: value (f64)
29/// +48: metric_type (u8)
30/// +49: temporality (u8)
31/// +50-51: padding
32pub fn encode_metric_entry(params: &MetricEntryParams<'_>) -> Vec<u8> {
33    let mut buf = Vec::with_capacity(256);
34    encode_metric_entry_into(&mut buf, params);
35    buf
36}
37
38/// Encode a MetricData FlatBuffer containing a vector of pre-encoded metric entries.
39///
40/// MetricData table:
41/// - field 0: metrics `[MetricEntry]` (vector of tables)
42pub fn encode_metric_data(encoded_metrics: &[Vec<u8>]) -> Vec<u8> {
43    let vtable_size: u16 = 4 + 2;
44    let table_size: u16 = 8;
45
46    let metrics_total: usize = encoded_metrics.iter().map(|m| m.len() + 4).sum();
47    let estimated = 4 + vtable_size as usize + table_size as usize + 4 + metrics_total + 64;
48    let mut buf = Vec::with_capacity(estimated);
49
50    // Root offset placeholder
51    buf.extend_from_slice(&[0u8; 4]);
52
53    // VTable
54    let vtable_start = buf.len();
55    write_u16(&mut buf, vtable_size);
56    write_u16(&mut buf, table_size);
57    write_u16(&mut buf, 4); // field 0: metrics at table+4
58
59    // Align vtable (6 -> pad 2)
60    buf.extend_from_slice(&[0u8; 2]);
61
62    // Table
63    let table_start = buf.len();
64    let soffset = (table_start - vtable_start) as i32;
65    write_i32(&mut buf, soffset);
66
67    let metrics_off_pos = buf.len();
68    write_u32(&mut buf, 0);
69
70    align4(&mut buf);
71
72    // Metrics vector
73    let metrics_vec_start = buf.len();
74    let count = encoded_metrics.len();
75
76    write_u32(&mut buf, count as u32);
77
78    let offsets_start = buf.len();
79    for _ in 0..count {
80        write_u32(&mut buf, 0);
81    }
82
83    align4(&mut buf);
84
85    let mut table_positions = Vec::with_capacity(count);
86    for metric_bytes in encoded_metrics {
87        align4(&mut buf);
88
89        let metric_start = buf.len();
90        let root_offset = if metric_bytes.len() >= 4 {
91            u32::from_le_bytes([
92                metric_bytes[0],
93                metric_bytes[1],
94                metric_bytes[2],
95                metric_bytes[3],
96            ]) as usize
97        } else {
98            0
99        };
100
101        table_positions.push(metric_start + root_offset);
102        buf.extend_from_slice(metric_bytes);
103    }
104
105    for (i, &table_pos) in table_positions.iter().enumerate() {
106        let offset_pos = offsets_start + i * 4;
107        patch_offset(&mut buf, offset_pos, table_pos);
108    }
109
110    patch_offset(&mut buf, metrics_off_pos, metrics_vec_start);
111    buf[0..4].copy_from_slice(&(table_start as u32).to_le_bytes());
112
113    buf
114}
115
116/// Encode multiple metric entries directly into a caller-owned buffer as a MetricData FlatBuffer.
117///
118/// Zero-copy: writes the header first with reserved offset slots, then encodes
119/// entries directly in their final position. No intermediate allocations or copies.
120/// The caller can reuse `buf` across flushes via `buf.clear()`.
121///
122/// Returns the range `start..buf.len()` of the MetricData bytes within `buf`.
123pub fn encode_metric_data_into(
124    buf: &mut Vec<u8>,
125    metrics: &[MetricEntryParams<'_>],
126) -> std::ops::Range<usize> {
127    let data_start = buf.len();
128    let count = metrics.len();
129
130    // Header: root(4) + vtable(6+2pad) + table(8) + vec_len(4) + slots(4*N)
131    let root_pos = buf.len();
132    buf.extend_from_slice(&[0u8; 4]);
133
134    let vtable_start = buf.len();
135    write_u16(buf, 6); // vtable_size
136    write_u16(buf, 8); // table_size
137    write_u16(buf, 4); // field 0: metrics at table+4
138    buf.extend_from_slice(&[0u8; 2]); // align vtable
139
140    let table_start = buf.len();
141    write_i32(buf, (table_start - vtable_start) as i32);
142
143    let metrics_off_pos = buf.len();
144    write_u32(buf, 0);
145
146    align4(buf);
147
148    let metrics_vec_start = buf.len();
149    write_u32(buf, count as u32);
150
151    let offsets_start = buf.len();
152    for _ in 0..count {
153        write_u32(buf, 0);
154    }
155
156    align4(buf);
157
158    // Encode entries directly after header
159    let mut table_positions = Vec::with_capacity(count);
160    for params in metrics {
161        align4(buf);
162        let entry_start = buf.len();
163        encode_metric_entry_into(buf, params);
164        let root_offset = u32::from_le_bytes([
165            buf[entry_start],
166            buf[entry_start + 1],
167            buf[entry_start + 2],
168            buf[entry_start + 3],
169        ]) as usize;
170        table_positions.push(entry_start + root_offset);
171    }
172
173    // Patch vector offset slots
174    for (i, &table_pos) in table_positions.iter().enumerate() {
175        patch_offset(buf, offsets_start + i * 4, table_pos);
176    }
177
178    patch_offset(buf, metrics_off_pos, metrics_vec_start);
179    buf[root_pos..root_pos + 4].copy_from_slice(&((table_start - data_start) as u32).to_le_bytes());
180
181    data_start..buf.len()
182}
183
184/// Encode a single metric entry directly into an existing buffer.
185fn encode_metric_entry_into(buf: &mut Vec<u8>, params: &MetricEntryParams<'_>) {
186    let has_source = params.source.is_some();
187    let has_service = params.service.is_some();
188    let has_labels = !params.labels.is_empty();
189    let has_histogram = params.histogram.is_some();
190    let has_session_id = params.session_id.is_some();
191
192    // VTable: 4 + 11 field slots * 2 = 26 bytes, padded to 28
193    let vtable_size: u16 = 4 + 11 * 2;
194    // Table: soffset(4) + 7 offsets(28) + timestamp(8) + value(8) + metric_type(1) + temporality(1) + pad(2) = 52
195    let table_size: u16 = 52;
196
197    // Root offset placeholder
198    let root_pos = buf.len();
199    buf.extend_from_slice(&[0u8; 4]);
200
201    // VTable
202    let vtable_start = buf.len();
203    write_u16(buf, vtable_size);
204    write_u16(buf, table_size);
205
206    // Field slots (by field ID 0..10)
207    write_u16(buf, 48); // field 0: metric_type
208    write_u16(buf, 32); // field 1: timestamp
209    write_u16(buf, 4); // field 2: name (always present, required)
210    write_u16(buf, 40); // field 3: value
211    write_u16(buf, if has_source { 8 } else { 0 }); // field 4: source
212    write_u16(buf, if has_service { 12 } else { 0 }); // field 5: service
213    write_u16(buf, if has_labels { 16 } else { 0 }); // field 6: labels
214    write_u16(buf, 49); // field 7: temporality
215    write_u16(buf, if has_histogram { 24 } else { 0 }); // field 8: histogram
216    write_u16(buf, if has_session_id { 28 } else { 0 }); // field 9: session_id
217    write_u16(buf, 0); // field 10: int_labels (not supported)
218
219    // Align vtable (26 -> pad 2)
220    buf.extend_from_slice(&[0u8; 2]);
221
222    // Table
223    let table_start = buf.len();
224    let soffset = (table_start - vtable_start) as i32;
225    write_i32(buf, soffset);
226
227    // Offset placeholders
228    let name_off_pos = buf.len();
229    write_u32(buf, 0); // +4: name
230
231    let source_off_pos = buf.len();
232    write_u32(buf, 0); // +8: source
233
234    let service_off_pos = buf.len();
235    write_u32(buf, 0); // +12: service
236
237    let labels_off_pos = buf.len();
238    write_u32(buf, 0); // +16: labels
239
240    write_u32(buf, 0); // +20: int_labels (always 0)
241
242    let histogram_off_pos = buf.len();
243    write_u32(buf, 0); // +24: histogram
244
245    let session_id_off_pos = buf.len();
246    write_u32(buf, 0); // +28: session_id
247
248    // Inline scalars
249    write_u64(buf, params.timestamp); // +32: timestamp
250    write_f64(buf, params.value); // +40: value
251    buf.push(params.metric_type.as_u8()); // +48: metric_type
252    buf.push(params.temporality.as_u8()); // +49: temporality
253    buf.extend_from_slice(&[0u8; 2]); // +50: padding
254
255    // Variable-length data
256    align4(buf);
257
258    // name (required)
259    let name_start = write_string(buf, params.name);
260    align4(buf);
261
262    // source
263    let source_start = params.source.map(|s| write_string(buf, s));
264    align4(buf);
265
266    // service
267    let service_start = params.service.map(|s| write_string(buf, s));
268    align4(buf);
269
270    // session_id
271    let session_id_start = params.session_id.map(|id| write_byte_vector(buf, id));
272    align4(buf);
273
274    // labels (vector of Label tables)
275    let labels_start = if has_labels {
276        Some(encode_labels(buf, params.labels))
277    } else {
278        None
279    };
280
281    // histogram (sub-table)
282    let histogram_start = params.histogram.map(|h| encode_histogram(buf, h));
283
284    // Patch offsets
285    buf[root_pos..root_pos + 4].copy_from_slice(&((table_start - root_pos) as u32).to_le_bytes());
286    patch_offset(buf, name_off_pos, name_start);
287
288    if let Some(start) = source_start {
289        patch_offset(buf, source_off_pos, start);
290    }
291    if let Some(start) = service_start {
292        patch_offset(buf, service_off_pos, start);
293    }
294    if let Some(start) = session_id_start {
295        patch_offset(buf, session_id_off_pos, start);
296    }
297    if let Some(start) = labels_start {
298        patch_offset(buf, labels_off_pos, start);
299    }
300    if let Some(start) = histogram_start {
301        patch_offset(buf, histogram_off_pos, start);
302    }
303}
304
305/// Encode a vector of Label tables into the buffer.
306///
307/// Label table fields:
308/// - field 0: key `string` (required)
309/// - field 1: value `string` (required)
310///
311/// Returns the start position of the vector (for offset patching).
312fn encode_labels(buf: &mut Vec<u8>, labels: &[crate::LabelParam<'_>]) -> usize {
313    let count = labels.len();
314
315    // Write vector header: [count][offset_slot_0][offset_slot_1]...
316    let vec_start = buf.len();
317    write_u32(buf, count as u32);
318
319    let offsets_start = buf.len();
320    for _ in 0..count {
321        write_u32(buf, 0);
322    }
323
324    align4(buf);
325
326    // Write each Label table, tracking table positions
327    let mut table_positions = Vec::with_capacity(count);
328    for label in labels {
329        align4(buf);
330
331        // Label VTable: size(2) + table_size(2) + 2 fields(4) = 8 bytes
332        let vtable_start = buf.len();
333        write_u16(buf, 8); // vtable_size
334        write_u16(buf, 12); // table_size: soffset(4) + key_off(4) + value_off(4)
335        write_u16(buf, 4); // field 0: key at table+4
336        write_u16(buf, 8); // field 1: value at table+8
337
338        // Label Table
339        let table_start = buf.len();
340        table_positions.push(table_start);
341        let soffset = (table_start - vtable_start) as i32;
342        write_i32(buf, soffset);
343
344        let key_off_pos = buf.len();
345        write_u32(buf, 0);
346
347        let value_off_pos = buf.len();
348        write_u32(buf, 0);
349
350        // Key string
351        align4(buf);
352        let key_start = write_string(buf, label.key);
353
354        // Value string
355        align4(buf);
356        let value_start = write_string(buf, label.value);
357
358        // Patch label offsets
359        patch_offset(buf, key_off_pos, key_start);
360        patch_offset(buf, value_off_pos, value_start);
361    }
362
363    // Patch vector offset slots
364    for (i, &table_pos) in table_positions.iter().enumerate() {
365        patch_offset(buf, offsets_start + i * 4, table_pos);
366    }
367
368    vec_start
369}
370
371/// Encode a Histogram sub-table.
372///
373/// Histogram table fields:
374/// - field 0: count `u64`
375/// - field 1: sum `f64`
376/// - field 2: buckets `[Bucket]` (vector of tables)
377/// - field 3: min `f64`
378/// - field 4: max `f64`
379///
380/// Returns the table start position.
381fn encode_histogram(buf: &mut Vec<u8>, h: &crate::HistogramParams) -> usize {
382    let has_buckets = !h.buckets.is_empty();
383
384    // VTable: 4 + 5 fields * 2 = 14 bytes, aligned to 16
385    let vtable_start = buf.len();
386    write_u16(buf, 14); // vtable_size
387    // table: soffset(4) + count(8) + sum(8) + buckets_offset(4) + min(8) + max(8) = 40
388    write_u16(buf, 40);
389    write_u16(buf, 4); // field 0: count at +4
390    write_u16(buf, 12); // field 1: sum at +12
391    write_u16(buf, if has_buckets { 20 } else { 0 }); // field 2: buckets at +20
392    write_u16(buf, 24); // field 3: min at +24
393    write_u16(buf, 32); // field 4: max at +32
394
395    align4(buf);
396
397    // Table
398    let table_start = buf.len();
399    write_i32(buf, (table_start - vtable_start) as i32);
400
401    // count (u64) at +4
402    write_u64(buf, h.count);
403    // sum (f64) at +12
404    write_f64(buf, h.sum);
405
406    // buckets offset placeholder at +20
407    let buckets_off_pos = buf.len();
408    write_u32(buf, 0);
409
410    // min (f64) at +24
411    write_f64(buf, h.min);
412    // max (f64) at +32
413    write_f64(buf, h.max);
414
415    align4(buf);
416
417    // Buckets vector
418    if has_buckets {
419        let start = encode_buckets(buf, &h.buckets);
420        patch_offset(buf, buckets_off_pos, start);
421    }
422
423    table_start
424}
425
426/// Encode a vector of Bucket tables.
427///
428/// Bucket table fields:
429/// - field 0: upper_bound `f64`
430/// - field 1: count `u64`
431fn encode_buckets(buf: &mut Vec<u8>, buckets: &[(f64, u64)]) -> usize {
432    let vec_start = buf.len();
433    write_u32(buf, buckets.len() as u32);
434
435    let slots_start = buf.len();
436    for _ in 0..buckets.len() {
437        write_u32(buf, 0);
438    }
439    align4(buf);
440
441    let mut offsets = Vec::with_capacity(buckets.len());
442    for &(upper_bound, count) in buckets {
443        // Bucket VTable: 4 + 2*2 = 8 bytes
444        let vtable_start = buf.len();
445        write_u16(buf, 8); // vtable_size
446        write_u16(buf, 20); // table_size: soffset(4) + upper_bound(8) + count(8)
447        write_u16(buf, 4); // field 0: upper_bound at +4
448        write_u16(buf, 12); // field 1: count at +12
449
450        align4(buf);
451
452        let table_start = buf.len();
453        write_i32(buf, (table_start - vtable_start) as i32);
454
455        // upper_bound (f64)
456        write_f64(buf, upper_bound);
457        // count (u64)
458        write_u64(buf, count);
459
460        align4(buf);
461        offsets.push(table_start);
462    }
463
464    for (i, &offset) in offsets.iter().enumerate() {
465        patch_offset(buf, slots_start + i * 4, offset);
466    }
467
468    vec_start
469}