1use crate::schema::{FieldDef, SchemaEntry};
4use crate::types::{FieldType, FieldValue, FieldValueRef};
5use std::io::{self, Write};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub struct WireTypeId(pub u16);
14
15pub const MAGIC: [u8; 4] = [0x54, 0x52, 0x43, 0x00]; pub const VERSION: u8 = 1;
17pub const HEADER_SIZE: usize = 5;
18
19pub const TAG_SCHEMA: u8 = 0x01;
20pub const TAG_EVENT: u8 = 0x02;
21pub const TAG_STRING_POOL: u8 = 0x03;
22pub const TAG_TIMESTAMP_RESET: u8 = 0x05;
24
25pub const MAX_TIMESTAMP_DELTA_NS: u64 = 0xFF_FFFF; #[inline]
30pub fn encode_u24_le(value: u32, w: &mut impl Write) -> io::Result<()> {
31 debug_assert!(value <= MAX_TIMESTAMP_DELTA_NS as u32);
32 w.write_all(&[value as u8, (value >> 8) as u8, (value >> 16) as u8])
33}
34
35#[inline]
37pub fn decode_u24_le(data: &[u8]) -> Option<u32> {
38 let b = data.get(..3)?;
39 Some(b[0] as u32 | (b[1] as u32) << 8 | (b[2] as u32) << 16)
40}
41
42#[derive(Debug, Clone, PartialEq)]
43pub struct PoolEntry {
44 pub pool_id: u32,
45 pub data: Vec<u8>,
46}
47
48#[derive(Debug, Clone, PartialEq)]
49pub enum Frame {
50 Schema {
51 type_id: WireTypeId,
52 entry: SchemaEntry,
53 },
54 Event {
55 type_id: WireTypeId,
56 timestamp_ns: Option<u64>,
58 values: Vec<FieldValue>,
59 },
60 StringPool(Vec<PoolEntry>),
61 TimestampReset(u64),
62}
63
64#[derive(Debug, Clone, PartialEq)]
66pub struct PoolEntryRef<'a> {
67 pub pool_id: u32,
68 pub data: &'a [u8],
69}
70
71#[derive(Debug, Clone, PartialEq)]
73pub enum FrameRef<'a> {
74 Schema {
75 type_id: WireTypeId,
76 entry: SchemaEntry,
77 },
78 Event {
79 type_id: WireTypeId,
80 timestamp_ns: Option<u64>,
81 values: Vec<FieldValueRef<'a>>,
82 },
83 StringPool(Vec<PoolEntryRef<'a>>),
84 TimestampReset(u64),
85}
86
87pub struct SchemaInfo<'a> {
89 pub field_types: &'a [FieldType],
90 pub has_timestamp: bool,
91}
92
93pub fn encode_header(w: &mut impl Write) -> io::Result<()> {
96 w.write_all(&MAGIC)?;
97 w.write_all(&[VERSION])
98}
99
100pub fn encode_schema(
101 type_id: WireTypeId,
102 entry: &SchemaEntry,
103 w: &mut impl Write,
104) -> io::Result<()> {
105 w.write_all(&[TAG_SCHEMA])?;
106 w.write_all(&type_id.0.to_le_bytes())?;
107 let name_bytes = entry.name.as_bytes();
108 w.write_all(&(name_bytes.len() as u16).to_le_bytes())?;
109 w.write_all(name_bytes)?;
110 w.write_all(&[if entry.has_timestamp { 1 } else { 0 }])?;
111 w.write_all(&(entry.fields.len() as u16).to_le_bytes())?;
112 for f in &entry.fields {
113 let fname = f.name.as_bytes();
114 w.write_all(&(fname.len() as u16).to_le_bytes())?;
115 w.write_all(fname)?;
116 w.write_all(&[f.field_type as u8])?;
117 }
118 Ok(())
119}
120
121pub fn encode_event(
124 type_id: WireTypeId,
125 timestamp_delta_ns: Option<u32>,
126 values: &[FieldValue],
127 w: &mut impl Write,
128) -> io::Result<()> {
129 w.write_all(&[TAG_EVENT])?;
130 w.write_all(&type_id.0.to_le_bytes())?;
131 if let Some(delta) = timestamp_delta_ns {
132 encode_u24_le(delta, w)?;
133 }
134 for v in values {
135 v.encode(w)?;
136 }
137 Ok(())
138}
139
140pub fn encode_string_pool(entries: &[PoolEntry], w: &mut impl Write) -> io::Result<()> {
141 w.write_all(&[TAG_STRING_POOL])?;
142 w.write_all(&(entries.len() as u32).to_le_bytes())?;
143 for e in entries {
144 w.write_all(&e.pool_id.to_le_bytes())?;
145 w.write_all(&(e.data.len() as u32).to_le_bytes())?;
146 w.write_all(&e.data)?;
147 }
148 Ok(())
149}
150
151pub fn decode_header(data: &[u8]) -> Option<u8> {
154 if data.get(..4)? != MAGIC {
155 return None;
156 }
157 let version = *data.get(4)?;
158 Some(version)
159}
160
161pub fn decode_frame<'s>(
163 data: &[u8],
164 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
165 timestamp_base_ns: u64,
166) -> Option<(Frame, usize)> {
167 let tag = *data.first()?;
168 match tag {
169 TAG_SCHEMA => decode_schema_frame(data),
170 TAG_EVENT => decode_event_frame(data, schema_lookup, timestamp_base_ns),
171 TAG_STRING_POOL => decode_string_pool_frame(data),
172 TAG_TIMESTAMP_RESET => {
173 let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
174 Some((Frame::TimestampReset(ts), 9))
175 }
176 _ => None,
177 }
178}
179
180fn decode_schema_frame(data: &[u8]) -> Option<(Frame, usize)> {
181 let mut pos = 1; let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
183 pos += 2;
184 let name_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
185 pos += 2;
186 let name = String::from_utf8(data.get(pos..pos + name_len)?.to_vec()).ok()?;
187 pos += name_len;
188 let has_timestamp = *data.get(pos)? != 0;
189 pos += 1;
190 let field_count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
191 pos += 2;
192 let mut fields = Vec::with_capacity(field_count);
193 for _ in 0..field_count {
194 let fname_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
195 pos += 2;
196 let fname = String::from_utf8(data.get(pos..pos + fname_len)?.to_vec()).ok()?;
197 pos += fname_len;
198 let ft = FieldType::from_tag(*data.get(pos)?)?;
199 pos += 1;
200 fields.push(FieldDef {
201 name: fname,
202 field_type: ft,
203 });
204 }
205 Some((
206 Frame::Schema {
207 type_id,
208 entry: SchemaEntry {
209 name,
210 has_timestamp,
211 fields,
212 },
213 },
214 pos,
215 ))
216}
217
218fn decode_event_frame<'s>(
219 data: &[u8],
220 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
221 timestamp_base_ns: u64,
222) -> Option<(Frame, usize)> {
223 let mut pos = 1; let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
225 pos += 2;
226 let info = schema_lookup(type_id)?;
227
228 let timestamp_ns = if info.has_timestamp {
229 let delta = decode_u24_le(&data[pos..])?;
230 pos += 3;
231 Some(timestamp_base_ns.checked_add(delta as u64)?)
232 } else {
233 None
234 };
235
236 let mut values = Vec::with_capacity(info.field_types.len());
237 let mut remaining = &data[pos..];
238 for ft in info.field_types {
239 let (val, rest) = FieldValue::decode(*ft, remaining)?;
240 values.push(val);
241 remaining = rest;
242 }
243 let consumed = data.len() - remaining.len();
244 Some((
245 Frame::Event {
246 type_id,
247 timestamp_ns,
248 values,
249 },
250 consumed,
251 ))
252}
253
254fn decode_string_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
255 let mut pos = 1;
256 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
257 pos += 4;
258 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
259 for _ in 0..count {
260 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
261 pos += 4;
262 let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
263 pos += 4;
264 let d = data.get(pos..pos + len)?.to_vec();
265 pos += len;
266 entries.push(PoolEntry { pool_id, data: d });
267 }
268 Some((Frame::StringPool(entries), pos))
269}
270
271pub fn decode_frame_ref<'a, 's>(
275 data: &'a [u8],
276 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
277 timestamp_base_ns: u64,
278) -> Option<(FrameRef<'a>, usize)> {
279 let tag = *data.first()?;
280 match tag {
281 TAG_SCHEMA => {
282 let (frame, consumed) = decode_schema_frame(data)?;
283 match frame {
284 Frame::Schema { type_id, entry } => {
285 Some((FrameRef::Schema { type_id, entry }, consumed))
286 }
287 _ => unreachable!(),
288 }
289 }
290 TAG_EVENT => decode_event_frame_ref(data, schema_lookup, timestamp_base_ns),
291 TAG_STRING_POOL => decode_string_pool_frame_ref(data),
292 TAG_TIMESTAMP_RESET => {
293 let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
294 Some((FrameRef::TimestampReset(ts), 9))
295 }
296 _ => None,
297 }
298}
299
300fn decode_event_frame_ref<'a, 's>(
301 data: &'a [u8],
302 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
303 timestamp_base_ns: u64,
304) -> Option<(FrameRef<'a>, usize)> {
305 let mut pos = 1;
306 let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
307 pos += 2;
308 let info = schema_lookup(type_id)?;
309
310 let timestamp_ns = if info.has_timestamp {
311 let delta = decode_u24_le(&data[pos..])?;
312 pos += 3;
313 Some(timestamp_base_ns.checked_add(delta as u64)?)
314 } else {
315 None
316 };
317
318 let mut values = Vec::with_capacity(info.field_types.len());
319 for ft in info.field_types {
320 let (val, consumed) = FieldValueRef::decode(*ft, data, pos)?;
321 values.push(val);
322 pos += consumed;
323 }
324 Some((
325 FrameRef::Event {
326 type_id,
327 timestamp_ns,
328 values,
329 },
330 pos,
331 ))
332}
333
334fn decode_string_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
335 let mut pos = 1;
336 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
337 pos += 4;
338 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
339 for _ in 0..count {
340 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
341 pos += 4;
342 let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
343 pos += 4;
344 let d = data.get(pos..pos + len)?;
345 pos += len;
346 entries.push(PoolEntryRef { pool_id, data: d });
347 }
348 Some((FrameRef::StringPool(entries), pos))
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
358 fn header_encode_decode() {
359 let mut buf = Vec::new();
360 encode_header(&mut buf).unwrap();
361 assert_eq!(buf, [0x54, 0x52, 0x43, 0x00, 1]);
362 assert_eq!(decode_header(&buf), Some(1));
363 }
364
365 #[test]
366 fn header_bad_magic() {
367 assert_eq!(decode_header(&[0x00, 0x00, 0x00, 0x00, 1]), None);
368 }
369
370 #[test]
371 fn header_too_short() {
372 assert_eq!(decode_header(&[0x54, 0x52]), None);
373 }
374
375 #[test]
378 fn schema_frame_round_trip() {
379 let type_id = WireTypeId(1);
380 let entry = SchemaEntry {
381 name: "PollStart".into(),
382 has_timestamp: true,
383 fields: vec![FieldDef {
384 name: "worker".into(),
385 field_type: FieldType::Varint,
386 }],
387 };
388 let mut buf = Vec::new();
389 encode_schema(type_id, &entry, &mut buf).unwrap();
390 assert_eq!(buf[0], TAG_SCHEMA);
391 let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
392 assert_eq!(consumed, buf.len());
393 assert_eq!(frame, Frame::Schema { type_id, entry });
394 }
395
396 #[test]
397 fn schema_frame_empty_fields() {
398 let type_id = WireTypeId(0);
399 let entry = SchemaEntry {
400 name: "Empty".into(),
401 has_timestamp: false,
402 fields: vec![],
403 };
404 let mut buf = Vec::new();
405 encode_schema(type_id, &entry, &mut buf).unwrap();
406 let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
407 assert_eq!(frame, Frame::Schema { type_id, entry });
408 }
409
410 #[test]
413 fn event_frame_round_trip() {
414 let values = vec![
415 FieldValue::Varint(12345),
416 FieldValue::Bool(true),
417 FieldValue::String("hi".to_string()),
418 ];
419 let mut buf = Vec::new();
420 encode_event(WireTypeId(1), None, &values, &mut buf).unwrap();
421 assert_eq!(buf[0], TAG_EVENT);
422
423 let types = vec![FieldType::Varint, FieldType::Bool, FieldType::String];
424 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
425 if id == WireTypeId(1) {
426 Some(SchemaInfo {
427 field_types: &types,
428 has_timestamp: false,
429 })
430 } else {
431 None
432 }
433 };
434 let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
435 assert_eq!(consumed, buf.len());
436 assert_eq!(
437 frame,
438 Frame::Event {
439 type_id: WireTypeId(1),
440 timestamp_ns: None,
441 values
442 }
443 );
444 }
445
446 #[test]
447 fn event_frame_with_timestamp_round_trip() {
448 let values = vec![FieldValue::Varint(42)];
449 let mut buf = Vec::new();
450 encode_event(WireTypeId(1), Some(1_000_000), &values, &mut buf).unwrap();
451
452 let types = vec![FieldType::Varint];
453 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
454 if id == WireTypeId(1) {
455 Some(SchemaInfo {
456 field_types: &types,
457 has_timestamp: true,
458 })
459 } else {
460 None
461 }
462 };
463 let (frame, consumed) = decode_frame(&buf, lookup, 5_000_000).unwrap();
464 assert_eq!(consumed, buf.len());
465 assert_eq!(
466 frame,
467 Frame::Event {
468 type_id: WireTypeId(1),
469 timestamp_ns: Some(5_000_000 + 1_000_000),
470 values,
471 }
472 );
473 }
474
475 #[test]
476 fn event_frame_unknown_type_id() {
477 let mut buf = Vec::new();
478 encode_event(WireTypeId(99), None, &[FieldValue::Varint(1)], &mut buf).unwrap();
479 assert!(decode_frame(&buf, |_| None, 0).is_none());
480 }
481
482 #[test]
483 fn event_frame_varint_compact() {
484 let values = vec![FieldValue::Varint(1_050_000), FieldValue::Varint(3)];
485 let mut buf = Vec::new();
486 encode_event(WireTypeId(2), None, &values, &mut buf).unwrap();
487 assert!(
488 buf.len() <= 7,
489 "varint PollEnd should be <=7 bytes, got {}",
490 buf.len()
491 );
492
493 let types = vec![FieldType::Varint, FieldType::Varint];
494 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
495 if id == WireTypeId(2) {
496 Some(SchemaInfo {
497 field_types: &types,
498 has_timestamp: false,
499 })
500 } else {
501 None
502 }
503 };
504 let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
505 assert_eq!(consumed, buf.len());
506 assert_eq!(
507 frame,
508 Frame::Event {
509 type_id: WireTypeId(2),
510 timestamp_ns: None,
511 values
512 }
513 );
514 }
515
516 #[test]
519 fn string_pool_round_trip() {
520 let entries = vec![
521 PoolEntry {
522 pool_id: 0,
523 data: b"main_thread".to_vec(),
524 },
525 PoolEntry {
526 pool_id: 1,
527 data: b"worker-1".to_vec(),
528 },
529 ];
530 let mut buf = Vec::new();
531 encode_string_pool(&entries, &mut buf).unwrap();
532 assert_eq!(buf[0], TAG_STRING_POOL);
533 let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
534 assert_eq!(consumed, buf.len());
535 assert_eq!(frame, Frame::StringPool(entries));
536 }
537
538 #[test]
539 fn string_pool_empty() {
540 let mut buf = Vec::new();
541 encode_string_pool(&[], &mut buf).unwrap();
542 let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
543 assert_eq!(frame, Frame::StringPool(vec![]));
544 }
545
546 #[test]
547 fn unknown_tag_returns_none() {
548 assert!(decode_frame(&[0xFF], |_| None, 0).is_none());
549 }
550}