1use crate::schema::{FieldDef, SchemaEntry};
9use crate::types::{FieldType, FieldValue, FieldValueRef};
10use std::io::{self, Write};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct WireTypeId(pub u16);
19
20pub(crate) const MAGIC: [u8; 4] = [0x54, 0x52, 0x43, 0x00]; pub(crate) const VERSION: u8 = 1;
22pub(crate) const HEADER_SIZE: usize = 5;
23
24pub(crate) const TAG_SCHEMA: u8 = 0x01;
25pub(crate) const TAG_EVENT: u8 = 0x02;
26pub(crate) const TAG_STRING_POOL: u8 = 0x03;
27pub(crate) const TAG_STACK_POOL: u8 = 0x04;
28pub(crate) const TAG_TIMESTAMP_RESET: u8 = 0x05;
29pub(crate) const MAX_TIMESTAMP_DELTA_NS: u64 = 0xFF_FFFF; #[inline]
36pub(crate) fn encode_u24_le(value: u32, w: &mut impl Write) -> io::Result<()> {
37 debug_assert!(value <= MAX_TIMESTAMP_DELTA_NS as u32);
38 w.write_all(&[value as u8, (value >> 8) as u8, (value >> 16) as u8])
39}
40
41#[inline]
43pub(crate) fn decode_u24_le(data: &[u8]) -> Option<u32> {
44 let b = data.get(..3)?;
45 Some(b[0] as u32 | (b[1] as u32) << 8 | (b[2] as u32) << 16)
46}
47
48#[derive(Debug, Clone, PartialEq)]
50pub struct PoolEntry {
51 pub pool_id: u32,
53 pub data: Vec<u8>,
55}
56
57#[derive(Debug, Clone, PartialEq)]
59pub struct StackPoolEntry {
60 pub pool_id: u32,
62 pub frames: Vec<u64>,
64}
65
66#[derive(Debug, Clone, PartialEq)]
67pub(crate) enum Frame {
68 Schema {
69 type_id: WireTypeId,
70 entry: SchemaEntry,
71 },
72 Event {
73 type_id: WireTypeId,
74 timestamp_ns: Option<u64>,
76 values: Vec<FieldValue>,
77 },
78 StringPool(Vec<PoolEntry>),
79 StackPool(Vec<StackPoolEntry>),
80 TimestampReset(u64),
81}
82
83#[non_exhaustive]
85#[derive(Debug, Clone, PartialEq)]
86pub struct PoolEntryRef<'a> {
87 pub pool_id: u32,
89 pub data: &'a [u8],
91}
92
93#[non_exhaustive]
95#[derive(Debug, Clone, PartialEq)]
96pub struct StackPoolEntryRef<'a> {
97 pub pool_id: u32,
99 pub frames_le: &'a [u8],
101}
102
103impl<'a> StackPoolEntryRef<'a> {
104 pub fn frame_count(&self) -> u32 {
106 (self.frames_le.len() / 8) as u32
107 }
108
109 pub fn iter(&self) -> crate::types::StackFrameIter<'a> {
111 crate::types::StackFrameIter::new(self.frames_le, self.frame_count())
112 }
113
114 pub fn to_stack_frames(&self) -> crate::types::StackFrames {
116 self.iter().collect()
117 }
118}
119
120#[derive(Debug, Clone, PartialEq)]
122pub(crate) enum FrameRef<'a> {
123 Schema {
124 type_id: WireTypeId,
125 entry: SchemaEntry,
126 },
127 Event {
128 type_id: WireTypeId,
129 timestamp_ns: Option<u64>,
130 values: Vec<FieldValueRef<'a>>,
131 },
132 StringPool(Vec<PoolEntryRef<'a>>),
133 StackPool(Vec<StackPoolEntryRef<'a>>),
134 TimestampReset(u64),
135}
136
137pub(crate) struct SchemaInfo<'a> {
140 pub field_tags: &'a [u8],
141 pub has_timestamp: bool,
142}
143
144pub(crate) fn encode_header(w: &mut impl Write) -> io::Result<()> {
147 w.write_all(&MAGIC)?;
148 w.write_all(&[VERSION])
149}
150
151pub(crate) fn encode_schema(
152 type_id: WireTypeId,
153 entry: &SchemaEntry,
154 w: &mut impl Write,
155) -> io::Result<()> {
156 w.write_all(&[TAG_SCHEMA])?;
157 w.write_all(&type_id.0.to_le_bytes())?;
158 let name_bytes = entry.name.as_bytes();
159 w.write_all(&(name_bytes.len() as u16).to_le_bytes())?;
160 w.write_all(name_bytes)?;
161 w.write_all(&[if entry.has_timestamp { 1 } else { 0 }])?;
162 w.write_all(&(entry.fields.len() as u16).to_le_bytes())?;
163 for f in &entry.fields {
164 let fname = f.name.as_bytes();
165 w.write_all(&(fname.len() as u16).to_le_bytes())?;
166 w.write_all(fname)?;
167 w.write_all(&[f.field_type as u8])?;
168 }
169 Ok(())
170}
171
172#[cfg(test)]
175pub(crate) fn encode_event(
176 type_id: WireTypeId,
177 timestamp_delta_ns: Option<u32>,
178 values: &[FieldValue],
179 w: &mut impl Write,
180) -> io::Result<()> {
181 w.write_all(&[TAG_EVENT])?;
182 w.write_all(&type_id.0.to_le_bytes())?;
183 if let Some(delta) = timestamp_delta_ns {
184 encode_u24_le(delta, w)?;
185 }
186 for v in values {
187 v.encode(w)?;
188 }
189 Ok(())
190}
191
192pub(crate) fn encode_string_pool(entries: &[PoolEntry], w: &mut impl Write) -> io::Result<()> {
193 w.write_all(&[TAG_STRING_POOL])?;
194 w.write_all(&(entries.len() as u32).to_le_bytes())?;
195 for e in entries {
196 w.write_all(&e.pool_id.to_le_bytes())?;
197 w.write_all(&(e.data.len() as u32).to_le_bytes())?;
198 w.write_all(&e.data)?;
199 }
200 Ok(())
201}
202
203pub(crate) fn encode_stack_pool(entries: &[StackPoolEntry], w: &mut impl Write) -> io::Result<()> {
204 w.write_all(&[TAG_STACK_POOL])?;
205 w.write_all(&(entries.len() as u32).to_le_bytes())?;
206 for e in entries {
207 w.write_all(&e.pool_id.to_le_bytes())?;
208 w.write_all(&(e.frames.len() as u32).to_le_bytes())?;
209 for &addr in &e.frames {
210 w.write_all(&addr.to_le_bytes())?;
211 }
212 }
213 Ok(())
214}
215
216pub(crate) fn decode_header(data: &[u8]) -> Option<u8> {
219 if data.get(..4)? != MAGIC {
220 return None;
221 }
222 let version = *data.get(4)?;
223 Some(version)
224}
225
226pub(crate) fn decode_frame<'s>(
228 data: &[u8],
229 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
230 timestamp_base_ns: u64,
231) -> Option<(Frame, usize)> {
232 let tag = *data.first()?;
233 match tag {
234 TAG_SCHEMA => decode_schema_frame(data),
235 TAG_EVENT => decode_event_frame(data, schema_lookup, timestamp_base_ns),
236 TAG_STRING_POOL => decode_string_pool_frame(data),
237 TAG_STACK_POOL => decode_stack_pool_frame(data),
238 TAG_TIMESTAMP_RESET => {
239 let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
240 Some((Frame::TimestampReset(ts), 9))
241 }
242 _ => None,
243 }
244}
245
246fn decode_schema_frame(data: &[u8]) -> Option<(Frame, usize)> {
247 let mut pos = 1; let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
249 pos += 2;
250 let name_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
251 pos += 2;
252 let name = String::from_utf8(data.get(pos..pos + name_len)?.to_vec()).ok()?;
253 pos += name_len;
254 let has_timestamp = *data.get(pos)? != 0;
255 pos += 1;
256 let field_count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
257 pos += 2;
258 let mut fields = Vec::with_capacity(field_count);
259 for _ in 0..field_count {
260 let fname_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
261 pos += 2;
262 let fname = String::from_utf8(data.get(pos..pos + fname_len)?.to_vec()).ok()?;
263 pos += fname_len;
264 let raw_tag = *data.get(pos)?;
265 let ft = FieldType::from_tag(raw_tag)?;
266 pos += 1;
267 fields.push(FieldDef {
268 name: fname,
269 field_type: ft,
270 });
271 }
272 Some((
273 Frame::Schema {
274 type_id,
275 entry: SchemaEntry {
276 name,
277 has_timestamp,
278 fields,
279 },
280 },
281 pos,
282 ))
283}
284
285fn decode_event_frame<'s>(
286 data: &[u8],
287 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
288 timestamp_base_ns: u64,
289) -> Option<(Frame, usize)> {
290 let mut pos = 1; let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
292 pos += 2;
293 let info = schema_lookup(type_id)?;
294
295 let timestamp_ns = if info.has_timestamp {
296 let delta = decode_u24_le(&data[pos..])?;
297 pos += 3;
298 Some(timestamp_base_ns.checked_add(delta as u64)?)
299 } else {
300 None
301 };
302
303 let mut values = Vec::with_capacity(info.field_tags.len());
304 let mut remaining = &data[pos..];
305 for &tag in info.field_tags {
306 let ft = FieldType::from_tag(tag)?;
307 if ft.is_optional() {
308 let prefix = *remaining.first()?;
309 remaining = &remaining[1..];
310 if prefix == 0x00 {
311 values.push(FieldValue::None);
312 } else {
313 let (val, rest) = FieldValue::decode(ft.inner(), remaining)?;
314 values.push(val);
315 remaining = rest;
316 }
317 } else {
318 let (val, rest) = FieldValue::decode(ft, remaining)?;
319 values.push(val);
320 remaining = rest;
321 }
322 }
323 let consumed = data.len() - remaining.len();
324 Some((
325 Frame::Event {
326 type_id,
327 timestamp_ns,
328 values,
329 },
330 consumed,
331 ))
332}
333
334fn decode_string_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
335 let mut pos = 1;
336 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
337 pos += 4;
338 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
339 for _ in 0..count {
340 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
341 pos += 4;
342 let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
343 pos += 4;
344 let d = data.get(pos..pos + len)?.to_vec();
345 pos += len;
346 entries.push(PoolEntry { pool_id, data: d });
347 }
348 Some((Frame::StringPool(entries), pos))
349}
350
351fn decode_stack_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
352 let mut pos = 1;
353 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
354 pos += 4;
355 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 16));
356 for _ in 0..count {
357 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
358 pos += 4;
359 let frame_count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
360 pos += 4;
361 let bytes = frame_count.checked_mul(8)?;
362 let mut frames = Vec::with_capacity(frame_count);
363 for i in 0..frame_count {
364 let off = pos + i * 8;
365 let addr = u64::from_le_bytes(data.get(off..off + 8)?.try_into().ok()?);
366 frames.push(addr);
367 }
368 pos += bytes;
369 entries.push(StackPoolEntry { pool_id, frames });
370 }
371 Some((Frame::StackPool(entries), pos))
372}
373
374pub(crate) fn decode_frame_ref<'a, 's>(
378 data: &'a [u8],
379 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
380 timestamp_base_ns: u64,
381) -> Option<(FrameRef<'a>, usize)> {
382 let tag = *data.first()?;
383 match tag {
384 TAG_SCHEMA => {
385 let (frame, consumed) = decode_schema_frame(data)?;
386 match frame {
387 Frame::Schema { type_id, entry } => {
388 Some((FrameRef::Schema { type_id, entry }, consumed))
389 }
390 _ => unreachable!(),
391 }
392 }
393 TAG_EVENT => decode_event_frame_ref(data, schema_lookup, timestamp_base_ns),
394 TAG_STRING_POOL => decode_string_pool_frame_ref(data),
395 TAG_STACK_POOL => decode_stack_pool_frame_ref(data),
396 TAG_TIMESTAMP_RESET => {
397 let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
398 Some((FrameRef::TimestampReset(ts), 9))
399 }
400 _ => None,
401 }
402}
403
404fn decode_event_frame_ref<'a, 's>(
405 data: &'a [u8],
406 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
407 timestamp_base_ns: u64,
408) -> Option<(FrameRef<'a>, usize)> {
409 let mut pos = 1;
410 let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
411 pos += 2;
412 let info = schema_lookup(type_id)?;
413
414 let timestamp_ns = if info.has_timestamp {
415 let delta = decode_u24_le(&data[pos..])?;
416 pos += 3;
417 Some(timestamp_base_ns.checked_add(delta as u64)?)
418 } else {
419 None
420 };
421
422 let mut values = Vec::with_capacity(info.field_tags.len());
423 for &tag in info.field_tags {
424 let ft = FieldType::from_tag(tag)?;
425 if ft.is_optional() {
426 let prefix = *data.get(pos)?;
427 pos += 1;
428 if prefix == 0x00 {
429 values.push(FieldValueRef::None);
430 } else {
431 let (val, consumed) = FieldValueRef::decode(ft.inner(), data, pos)?;
432 values.push(val);
433 pos += consumed;
434 }
435 } else {
436 let (val, consumed) = FieldValueRef::decode(ft, data, pos)?;
437 values.push(val);
438 pos += consumed;
439 }
440 }
441 Some((
442 FrameRef::Event {
443 type_id,
444 timestamp_ns,
445 values,
446 },
447 pos,
448 ))
449}
450
451fn decode_string_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
452 let mut pos = 1;
453 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
454 pos += 4;
455 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
456 for _ in 0..count {
457 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
458 pos += 4;
459 let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
460 pos += 4;
461 let d = data.get(pos..pos + len)?;
462 pos += len;
463 entries.push(PoolEntryRef { pool_id, data: d });
464 }
465 Some((FrameRef::StringPool(entries), pos))
466}
467
468fn decode_stack_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
469 let mut pos = 1;
470 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
471 pos += 4;
472 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 16));
473 for _ in 0..count {
474 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
475 pos += 4;
476 let frame_count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
477 pos += 4;
478 let bytes = frame_count.checked_mul(8)?;
479 let frames_le = data.get(pos..pos + bytes)?;
480 pos += bytes;
481 entries.push(StackPoolEntryRef { pool_id, frames_le });
482 }
483 Some((FrameRef::StackPool(entries), pos))
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489
490 #[test]
493 fn header_encode_decode() {
494 let mut buf = Vec::new();
495 encode_header(&mut buf).unwrap();
496 assert_eq!(buf, [0x54, 0x52, 0x43, 0x00, 1]);
497 assert_eq!(decode_header(&buf), Some(1));
498 }
499
500 #[test]
501 fn header_bad_magic() {
502 assert_eq!(decode_header(&[0x00, 0x00, 0x00, 0x00, 1]), None);
503 }
504
505 #[test]
506 fn header_too_short() {
507 assert_eq!(decode_header(&[0x54, 0x52]), None);
508 }
509
510 #[test]
513 fn schema_frame_round_trip() {
514 let type_id = WireTypeId(1);
515 let entry = SchemaEntry {
516 name: "PollStart".into(),
517 has_timestamp: true,
518 fields: vec![FieldDef {
519 name: "worker".into(),
520 field_type: FieldType::Varint,
521 }],
522 };
523 let mut buf = Vec::new();
524 encode_schema(type_id, &entry, &mut buf).unwrap();
525 assert_eq!(buf[0], TAG_SCHEMA);
526 let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
527 assert_eq!(consumed, buf.len());
528 assert_eq!(frame, Frame::Schema { type_id, entry });
529 }
530
531 #[test]
532 fn schema_frame_empty_fields() {
533 let type_id = WireTypeId(0);
534 let entry = SchemaEntry {
535 name: "Empty".into(),
536 has_timestamp: false,
537 fields: vec![],
538 };
539 let mut buf = Vec::new();
540 encode_schema(type_id, &entry, &mut buf).unwrap();
541 let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
542 assert_eq!(frame, Frame::Schema { type_id, entry });
543 }
544
545 #[test]
548 fn event_frame_round_trip() {
549 let values = vec![
550 FieldValue::Varint(12345),
551 FieldValue::Bool(true),
552 FieldValue::String("hi".to_string()),
553 ];
554 let mut buf = Vec::new();
555 encode_event(WireTypeId(1), None, &values, &mut buf).unwrap();
556 assert_eq!(buf[0], TAG_EVENT);
557
558 let tags: Vec<u8> = vec![
559 FieldType::Varint as u8,
560 FieldType::Bool as u8,
561 FieldType::String as u8,
562 ];
563 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
564 if id == WireTypeId(1) {
565 Some(SchemaInfo {
566 field_tags: &tags,
567 has_timestamp: false,
568 })
569 } else {
570 None
571 }
572 };
573 let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
574 assert_eq!(consumed, buf.len());
575 assert_eq!(
576 frame,
577 Frame::Event {
578 type_id: WireTypeId(1),
579 timestamp_ns: None,
580 values
581 }
582 );
583 }
584
585 #[test]
586 fn event_frame_with_timestamp_round_trip() {
587 let values = vec![FieldValue::Varint(42)];
588 let mut buf = Vec::new();
589 encode_event(WireTypeId(1), Some(1_000_000), &values, &mut buf).unwrap();
590
591 let tags: Vec<u8> = vec![FieldType::Varint as u8];
592 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
593 if id == WireTypeId(1) {
594 Some(SchemaInfo {
595 field_tags: &tags,
596 has_timestamp: true,
597 })
598 } else {
599 None
600 }
601 };
602 let (frame, consumed) = decode_frame(&buf, lookup, 5_000_000).unwrap();
603 assert_eq!(consumed, buf.len());
604 assert_eq!(
605 frame,
606 Frame::Event {
607 type_id: WireTypeId(1),
608 timestamp_ns: Some(5_000_000 + 1_000_000),
609 values,
610 }
611 );
612 }
613
614 #[test]
615 fn event_frame_unknown_type_id() {
616 let mut buf = Vec::new();
617 encode_event(WireTypeId(99), None, &[FieldValue::Varint(1)], &mut buf).unwrap();
618 assert!(decode_frame(&buf, |_| None, 0).is_none());
619 }
620
621 #[test]
622 fn event_frame_varint_compact() {
623 let values = vec![FieldValue::Varint(1_050_000), FieldValue::Varint(3)];
624 let mut buf = Vec::new();
625 encode_event(WireTypeId(2), None, &values, &mut buf).unwrap();
626 assert!(
627 buf.len() <= 7,
628 "varint PollEnd should be <=7 bytes, got {}",
629 buf.len()
630 );
631
632 let tags: Vec<u8> = vec![FieldType::Varint as u8, FieldType::Varint as u8];
633 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
634 if id == WireTypeId(2) {
635 Some(SchemaInfo {
636 field_tags: &tags,
637 has_timestamp: false,
638 })
639 } else {
640 None
641 }
642 };
643 let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
644 assert_eq!(consumed, buf.len());
645 assert_eq!(
646 frame,
647 Frame::Event {
648 type_id: WireTypeId(2),
649 timestamp_ns: None,
650 values
651 }
652 );
653 }
654
655 #[test]
658 fn string_pool_round_trip() {
659 let entries = vec![
660 PoolEntry {
661 pool_id: 0,
662 data: b"main_thread".to_vec(),
663 },
664 PoolEntry {
665 pool_id: 1,
666 data: b"worker-1".to_vec(),
667 },
668 ];
669 let mut buf = Vec::new();
670 encode_string_pool(&entries, &mut buf).unwrap();
671 assert_eq!(buf[0], TAG_STRING_POOL);
672 let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
673 assert_eq!(consumed, buf.len());
674 assert_eq!(frame, Frame::StringPool(entries));
675 }
676
677 #[test]
678 fn string_pool_empty() {
679 let mut buf = Vec::new();
680 encode_string_pool(&[], &mut buf).unwrap();
681 let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
682 assert_eq!(frame, Frame::StringPool(vec![]));
683 }
684
685 #[test]
686 fn unknown_tag_returns_none() {
687 assert!(decode_frame(&[0xFF], |_| None, 0).is_none());
688 }
689
690 #[test]
691 fn truncated_event_frame() {
692 let tags: Vec<u8> = vec![FieldType::Varint as u8];
693 let data = [TAG_EVENT, 0x01];
694 let result = decode_frame(
695 &data,
696 |_| {
697 Some(SchemaInfo {
698 field_tags: &tags,
699 has_timestamp: false,
700 })
701 },
702 0,
703 );
704 assert!(result.is_none());
705 }
706
707 #[test]
708 fn truncated_schema_frame() {
709 let data = [TAG_SCHEMA, 0x00, 0x00];
710 let result = decode_frame(&data, |_: WireTypeId| None, 0);
711 assert!(result.is_none());
712 }
713}