1use crate::schema::{FieldAnnotation, FieldDef, SchemaEntry};
9use crate::types::{FieldType, FieldValue, FieldValueRef};
10use std::io::{self, Write};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct WireTypeId(pub u16);
19
20pub(crate) const MAGIC: [u8; 4] = [0x54, 0x52, 0x43, 0x00]; pub(crate) const VERSION: u8 = 1;
22pub(crate) const HEADER_SIZE: usize = 5;
23
24pub(crate) const TAG_SCHEMA: u8 = 0x01;
25pub(crate) const TAG_EVENT: u8 = 0x02;
26pub(crate) const TAG_STRING_POOL: u8 = 0x03;
27pub(crate) const TAG_STACK_POOL: u8 = 0x04;
28pub(crate) const TAG_TIMESTAMP_RESET: u8 = 0x05;
29pub(crate) const TAG_SCHEMA_ANNOTATIONS: u8 = 0x06;
30
31pub(crate) const MAX_TIMESTAMP_DELTA_NS: u64 = 0xFF_FFFF; #[inline]
36pub(crate) fn encode_u24_le(value: u32, w: &mut impl Write) -> io::Result<()> {
37 debug_assert!(value <= MAX_TIMESTAMP_DELTA_NS as u32);
38 w.write_all(&[value as u8, (value >> 8) as u8, (value >> 16) as u8])
39}
40
41#[inline]
43pub(crate) fn decode_u24_le(data: &[u8]) -> Option<u32> {
44 let b = data.get(..3)?;
45 Some(b[0] as u32 | (b[1] as u32) << 8 | (b[2] as u32) << 16)
46}
47
48#[derive(Debug, Clone, PartialEq)]
50pub struct PoolEntry {
51 pub pool_id: u32,
53 pub data: Vec<u8>,
55}
56
57#[derive(Debug, Clone, PartialEq)]
59pub struct StackPoolEntry {
60 pub pool_id: u32,
62 pub frames: Vec<u64>,
64}
65
66#[derive(Debug, Clone, PartialEq)]
67pub(crate) enum Frame {
68 Schema {
69 type_id: WireTypeId,
70 entry: SchemaEntry,
71 },
72 Event {
73 type_id: WireTypeId,
74 timestamp_ns: Option<u64>,
76 values: Vec<FieldValue>,
77 },
78 StringPool(Vec<PoolEntry>),
79 StackPool(Vec<StackPoolEntry>),
80 TimestampReset(u64),
81 SchemaAnnotations {
82 type_id: WireTypeId,
83 annotations: Vec<FieldAnnotation>,
84 },
85}
86
87#[non_exhaustive]
89#[derive(Debug, Clone, PartialEq)]
90pub struct PoolEntryRef<'a> {
91 pub pool_id: u32,
93 pub data: &'a [u8],
95}
96
97#[non_exhaustive]
99#[derive(Debug, Clone, PartialEq)]
100pub struct StackPoolEntryRef<'a> {
101 pub pool_id: u32,
103 pub frames_le: &'a [u8],
105}
106
107impl<'a> StackPoolEntryRef<'a> {
108 pub fn frame_count(&self) -> u32 {
110 (self.frames_le.len() / 8) as u32
111 }
112
113 pub fn iter(&self) -> crate::types::StackFrameIter<'a> {
115 crate::types::StackFrameIter::new(self.frames_le, self.frame_count())
116 }
117
118 pub fn to_stack_frames(&self) -> crate::types::StackFrames {
120 self.iter().collect()
121 }
122}
123
124#[derive(Debug, Clone, PartialEq)]
126pub(crate) enum FrameRef<'a> {
127 Schema {
128 type_id: WireTypeId,
129 entry: SchemaEntry,
130 },
131 Event {
132 type_id: WireTypeId,
133 timestamp_ns: Option<u64>,
134 values: Vec<FieldValueRef<'a>>,
135 },
136 StringPool(Vec<PoolEntryRef<'a>>),
137 StackPool(Vec<StackPoolEntryRef<'a>>),
138 TimestampReset(u64),
139 SchemaAnnotations {
140 type_id: WireTypeId,
141 annotations: Vec<FieldAnnotation>,
142 },
143}
144
145pub(crate) struct SchemaInfo<'a> {
148 pub field_tags: &'a [u8],
149 pub has_timestamp: bool,
150}
151
152pub(crate) fn encode_header(w: &mut impl Write) -> io::Result<()> {
155 w.write_all(&MAGIC)?;
156 w.write_all(&[VERSION])
157}
158
159pub(crate) fn encode_schema(
160 type_id: WireTypeId,
161 entry: &SchemaEntry,
162 w: &mut impl Write,
163) -> io::Result<()> {
164 w.write_all(&[TAG_SCHEMA])?;
165 w.write_all(&type_id.0.to_le_bytes())?;
166 let name_bytes = entry.name.as_bytes();
167 w.write_all(&(name_bytes.len() as u16).to_le_bytes())?;
168 w.write_all(name_bytes)?;
169 w.write_all(&[if entry.has_timestamp { 1 } else { 0 }])?;
170 w.write_all(&(entry.fields.len() as u16).to_le_bytes())?;
171 for f in &entry.fields {
172 let fname = f.name.as_bytes();
173 w.write_all(&(fname.len() as u16).to_le_bytes())?;
174 w.write_all(fname)?;
175 w.write_all(&[f.field_type as u8])?;
176 }
177 Ok(())
178}
179
180#[cfg(test)]
183pub(crate) fn encode_event(
184 type_id: WireTypeId,
185 timestamp_delta_ns: Option<u32>,
186 values: &[FieldValue],
187 w: &mut impl Write,
188) -> io::Result<()> {
189 w.write_all(&[TAG_EVENT])?;
190 w.write_all(&type_id.0.to_le_bytes())?;
191 if let Some(delta) = timestamp_delta_ns {
192 encode_u24_le(delta, w)?;
193 }
194 for v in values {
195 v.encode(w)?;
196 }
197 Ok(())
198}
199
200pub(crate) fn encode_string_pool(entries: &[PoolEntry], w: &mut impl Write) -> io::Result<()> {
201 w.write_all(&[TAG_STRING_POOL])?;
202 w.write_all(&(entries.len() as u32).to_le_bytes())?;
203 for e in entries {
204 w.write_all(&e.pool_id.to_le_bytes())?;
205 w.write_all(&(e.data.len() as u32).to_le_bytes())?;
206 w.write_all(&e.data)?;
207 }
208 Ok(())
209}
210
211pub(crate) fn encode_stack_pool(entries: &[StackPoolEntry], w: &mut impl Write) -> io::Result<()> {
212 w.write_all(&[TAG_STACK_POOL])?;
213 w.write_all(&(entries.len() as u32).to_le_bytes())?;
214 for e in entries {
215 w.write_all(&e.pool_id.to_le_bytes())?;
216 w.write_all(&(e.frames.len() as u32).to_le_bytes())?;
217 for &addr in &e.frames {
218 w.write_all(&addr.to_le_bytes())?;
219 }
220 }
221 Ok(())
222}
223
224pub(crate) fn encode_schema_annotations(
225 type_id: WireTypeId,
226 annotations: &[FieldAnnotation],
227 w: &mut impl Write,
228) -> io::Result<()> {
229 let count: u16 = annotations.len().try_into().map_err(|_| {
230 io::Error::new(
231 io::ErrorKind::InvalidInput,
232 "annotation count exceeds u16::MAX",
233 )
234 })?;
235 w.write_all(&[TAG_SCHEMA_ANNOTATIONS])?;
236 crate::leb128::encode_unsigned(type_id.0 as u64, w)?;
237 w.write_all(&count.to_le_bytes())?;
238 for a in annotations {
239 w.write_all(&a.field_index().to_le_bytes())?;
240 let key_bytes = a.key().as_bytes();
241 w.write_all(&(key_bytes.len() as u16).to_le_bytes())?;
242 w.write_all(key_bytes)?;
243 let value_bytes = a.value().as_bytes();
244 w.write_all(&(value_bytes.len() as u32).to_le_bytes())?;
245 w.write_all(value_bytes)?;
246 }
247 Ok(())
248}
249
250pub(crate) fn decode_header(data: &[u8]) -> Option<u8> {
253 if data.get(..4)? != MAGIC {
254 return None;
255 }
256 let version = *data.get(4)?;
257 Some(version)
258}
259
260pub(crate) fn decode_frame<'s>(
262 data: &[u8],
263 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
264 timestamp_base_ns: u64,
265) -> Option<(Frame, usize)> {
266 let tag = *data.first()?;
267 match tag {
268 TAG_SCHEMA => decode_schema_frame(data),
269 TAG_EVENT => decode_event_frame(data, schema_lookup, timestamp_base_ns),
270 TAG_STRING_POOL => decode_string_pool_frame(data),
271 TAG_STACK_POOL => decode_stack_pool_frame(data),
272 TAG_TIMESTAMP_RESET => {
273 let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
274 Some((Frame::TimestampReset(ts), 9))
275 }
276 TAG_SCHEMA_ANNOTATIONS => decode_schema_annotations_frame(data),
277 _ => None,
278 }
279}
280
281fn decode_schema_frame(data: &[u8]) -> Option<(Frame, usize)> {
282 let mut pos = 1; let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
284 pos += 2;
285 let name_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
286 pos += 2;
287 let name = String::from_utf8(data.get(pos..pos + name_len)?.to_vec()).ok()?;
288 pos += name_len;
289 let has_timestamp = *data.get(pos)? != 0;
290 pos += 1;
291 let field_count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
292 pos += 2;
293 let mut fields = Vec::with_capacity(field_count);
294 for _ in 0..field_count {
295 let fname_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
296 pos += 2;
297 let fname = String::from_utf8(data.get(pos..pos + fname_len)?.to_vec()).ok()?;
298 pos += fname_len;
299 let raw_tag = *data.get(pos)?;
300 let ft = FieldType::from_tag(raw_tag)?;
301 pos += 1;
302 fields.push(FieldDef {
303 name: fname,
304 field_type: ft,
305 });
306 }
307 Some((
308 Frame::Schema {
309 type_id,
310 entry: SchemaEntry {
311 name,
312 has_timestamp,
313 fields,
314 annotations: Vec::new(),
315 },
316 },
317 pos,
318 ))
319}
320
321fn decode_event_frame<'s>(
322 data: &[u8],
323 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
324 timestamp_base_ns: u64,
325) -> Option<(Frame, usize)> {
326 let mut pos = 1; let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
328 pos += 2;
329 let info = schema_lookup(type_id)?;
330
331 let timestamp_ns = if info.has_timestamp {
332 let delta = decode_u24_le(&data[pos..])?;
333 pos += 3;
334 Some(timestamp_base_ns.checked_add(delta as u64)?)
335 } else {
336 None
337 };
338
339 let mut values = Vec::with_capacity(info.field_tags.len());
340 let mut remaining = &data[pos..];
341 for &tag in info.field_tags {
342 let ft = FieldType::from_tag(tag)?;
343 if ft.is_optional() {
344 let prefix = *remaining.first()?;
345 remaining = &remaining[1..];
346 if prefix == 0x00 {
347 values.push(FieldValue::None);
348 } else {
349 let (val, rest) = FieldValue::decode(ft.inner(), remaining)?;
350 values.push(val);
351 remaining = rest;
352 }
353 } else {
354 let (val, rest) = FieldValue::decode(ft, remaining)?;
355 values.push(val);
356 remaining = rest;
357 }
358 }
359 let consumed = data.len() - remaining.len();
360 Some((
361 Frame::Event {
362 type_id,
363 timestamp_ns,
364 values,
365 },
366 consumed,
367 ))
368}
369
370fn decode_string_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
371 let mut pos = 1;
372 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
373 pos += 4;
374 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
375 for _ in 0..count {
376 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
377 pos += 4;
378 let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
379 pos += 4;
380 let d = data.get(pos..pos + len)?.to_vec();
381 pos += len;
382 entries.push(PoolEntry { pool_id, data: d });
383 }
384 Some((Frame::StringPool(entries), pos))
385}
386
387fn decode_stack_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
388 let mut pos = 1;
389 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
390 pos += 4;
391 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 16));
392 for _ in 0..count {
393 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
394 pos += 4;
395 let frame_count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
396 pos += 4;
397 let bytes = frame_count.checked_mul(8)?;
398 let mut frames = Vec::with_capacity(frame_count);
399 for i in 0..frame_count {
400 let off = pos + i * 8;
401 let addr = u64::from_le_bytes(data.get(off..off + 8)?.try_into().ok()?);
402 frames.push(addr);
403 }
404 pos += bytes;
405 entries.push(StackPoolEntry { pool_id, frames });
406 }
407 Some((Frame::StackPool(entries), pos))
408}
409
410fn decode_schema_annotations_frame(data: &[u8]) -> Option<(Frame, usize)> {
411 let mut pos = 1; let (type_id_raw, consumed) = crate::leb128::decode_unsigned(&data[pos..])?;
413 let type_id = WireTypeId(type_id_raw as u16);
414 pos += consumed;
415 let count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
416 pos += 2;
417 let mut annotations = Vec::with_capacity(count);
418 for _ in 0..count {
419 let field_index = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?);
420 pos += 2;
421 let key_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
422 pos += 2;
423 let key = String::from_utf8(data.get(pos..pos + key_len)?.to_vec()).ok()?;
424 pos += key_len;
425 let value_len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
426 pos += 4;
427 let value = String::from_utf8(data.get(pos..pos + value_len)?.to_vec()).ok()?;
428 pos += value_len;
429 annotations.push(FieldAnnotation::new(field_index, key, value));
430 }
431 Some((
432 Frame::SchemaAnnotations {
433 type_id,
434 annotations,
435 },
436 pos,
437 ))
438}
439
440pub(crate) fn decode_frame_ref<'a, 's>(
444 data: &'a [u8],
445 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
446 timestamp_base_ns: u64,
447) -> Option<(FrameRef<'a>, usize)> {
448 let tag = *data.first()?;
449 match tag {
450 TAG_SCHEMA => {
451 let (frame, consumed) = decode_schema_frame(data)?;
452 match frame {
453 Frame::Schema { type_id, entry } => {
454 Some((FrameRef::Schema { type_id, entry }, consumed))
455 }
456 _ => unreachable!(),
457 }
458 }
459 TAG_EVENT => decode_event_frame_ref(data, schema_lookup, timestamp_base_ns),
460 TAG_STRING_POOL => decode_string_pool_frame_ref(data),
461 TAG_STACK_POOL => decode_stack_pool_frame_ref(data),
462 TAG_TIMESTAMP_RESET => {
463 let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
464 Some((FrameRef::TimestampReset(ts), 9))
465 }
466 TAG_SCHEMA_ANNOTATIONS => {
467 let (frame, consumed) = decode_schema_annotations_frame(data)?;
468 match frame {
469 Frame::SchemaAnnotations {
470 type_id,
471 annotations,
472 } => Some((
473 FrameRef::SchemaAnnotations {
474 type_id,
475 annotations,
476 },
477 consumed,
478 )),
479 _ => unreachable!(),
480 }
481 }
482 _ => None,
483 }
484}
485
486fn decode_event_frame_ref<'a, 's>(
487 data: &'a [u8],
488 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
489 timestamp_base_ns: u64,
490) -> Option<(FrameRef<'a>, usize)> {
491 let mut pos = 1;
492 let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
493 pos += 2;
494 let info = schema_lookup(type_id)?;
495
496 let timestamp_ns = if info.has_timestamp {
497 let delta = decode_u24_le(&data[pos..])?;
498 pos += 3;
499 Some(timestamp_base_ns.checked_add(delta as u64)?)
500 } else {
501 None
502 };
503
504 let mut values = Vec::with_capacity(info.field_tags.len());
505 for &tag in info.field_tags {
506 let ft = FieldType::from_tag(tag)?;
507 if ft.is_optional() {
508 let prefix = *data.get(pos)?;
509 pos += 1;
510 if prefix == 0x00 {
511 values.push(FieldValueRef::None);
512 } else {
513 let (val, consumed) = FieldValueRef::decode(ft.inner(), data, pos)?;
514 values.push(val);
515 pos += consumed;
516 }
517 } else {
518 let (val, consumed) = FieldValueRef::decode(ft, data, pos)?;
519 values.push(val);
520 pos += consumed;
521 }
522 }
523 Some((
524 FrameRef::Event {
525 type_id,
526 timestamp_ns,
527 values,
528 },
529 pos,
530 ))
531}
532
533fn decode_string_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
534 let mut pos = 1;
535 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
536 pos += 4;
537 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
538 for _ in 0..count {
539 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
540 pos += 4;
541 let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
542 pos += 4;
543 let d = data.get(pos..pos + len)?;
544 pos += len;
545 entries.push(PoolEntryRef { pool_id, data: d });
546 }
547 Some((FrameRef::StringPool(entries), pos))
548}
549
550fn decode_stack_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
551 let mut pos = 1;
552 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
553 pos += 4;
554 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 16));
555 for _ in 0..count {
556 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
557 pos += 4;
558 let frame_count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
559 pos += 4;
560 let bytes = frame_count.checked_mul(8)?;
561 let frames_le = data.get(pos..pos + bytes)?;
562 pos += bytes;
563 entries.push(StackPoolEntryRef { pool_id, frames_le });
564 }
565 Some((FrameRef::StackPool(entries), pos))
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571
572 #[test]
575 fn header_encode_decode() {
576 let mut buf = Vec::new();
577 encode_header(&mut buf).unwrap();
578 assert_eq!(buf, [0x54, 0x52, 0x43, 0x00, 1]);
579 assert_eq!(decode_header(&buf), Some(1));
580 }
581
582 #[test]
583 fn header_bad_magic() {
584 assert_eq!(decode_header(&[0x00, 0x00, 0x00, 0x00, 1]), None);
585 }
586
587 #[test]
588 fn header_too_short() {
589 assert_eq!(decode_header(&[0x54, 0x52]), None);
590 }
591
592 #[test]
595 fn schema_frame_round_trip() {
596 let type_id = WireTypeId(1);
597 let entry = SchemaEntry {
598 name: "PollStart".into(),
599 has_timestamp: true,
600 fields: vec![FieldDef {
601 name: "worker".into(),
602 field_type: FieldType::Varint,
603 }],
604 annotations: Vec::new(),
605 };
606 let mut buf = Vec::new();
607 encode_schema(type_id, &entry, &mut buf).unwrap();
608 assert_eq!(buf[0], TAG_SCHEMA);
609 let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
610 assert_eq!(consumed, buf.len());
611 assert_eq!(frame, Frame::Schema { type_id, entry });
612 }
613
614 #[test]
615 fn schema_frame_empty_fields() {
616 let type_id = WireTypeId(0);
617 let entry = SchemaEntry {
618 name: "Empty".into(),
619 has_timestamp: false,
620 fields: vec![],
621 annotations: Vec::new(),
622 };
623 let mut buf = Vec::new();
624 encode_schema(type_id, &entry, &mut buf).unwrap();
625 let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
626 assert_eq!(frame, Frame::Schema { type_id, entry });
627 }
628
629 #[test]
632 fn event_frame_round_trip() {
633 let values = vec![
634 FieldValue::Varint(12345),
635 FieldValue::Bool(true),
636 FieldValue::String("hi".to_string()),
637 ];
638 let mut buf = Vec::new();
639 encode_event(WireTypeId(1), None, &values, &mut buf).unwrap();
640 assert_eq!(buf[0], TAG_EVENT);
641
642 let tags: Vec<u8> = vec![
643 FieldType::Varint as u8,
644 FieldType::Bool as u8,
645 FieldType::String as u8,
646 ];
647 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
648 if id == WireTypeId(1) {
649 Some(SchemaInfo {
650 field_tags: &tags,
651 has_timestamp: false,
652 })
653 } else {
654 None
655 }
656 };
657 let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
658 assert_eq!(consumed, buf.len());
659 assert_eq!(
660 frame,
661 Frame::Event {
662 type_id: WireTypeId(1),
663 timestamp_ns: None,
664 values
665 }
666 );
667 }
668
669 #[test]
670 fn event_frame_with_timestamp_round_trip() {
671 let values = vec![FieldValue::Varint(42)];
672 let mut buf = Vec::new();
673 encode_event(WireTypeId(1), Some(1_000_000), &values, &mut buf).unwrap();
674
675 let tags: Vec<u8> = vec![FieldType::Varint as u8];
676 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
677 if id == WireTypeId(1) {
678 Some(SchemaInfo {
679 field_tags: &tags,
680 has_timestamp: true,
681 })
682 } else {
683 None
684 }
685 };
686 let (frame, consumed) = decode_frame(&buf, lookup, 5_000_000).unwrap();
687 assert_eq!(consumed, buf.len());
688 assert_eq!(
689 frame,
690 Frame::Event {
691 type_id: WireTypeId(1),
692 timestamp_ns: Some(5_000_000 + 1_000_000),
693 values,
694 }
695 );
696 }
697
698 #[test]
699 fn event_frame_unknown_type_id() {
700 let mut buf = Vec::new();
701 encode_event(WireTypeId(99), None, &[FieldValue::Varint(1)], &mut buf).unwrap();
702 assert!(decode_frame(&buf, |_| None, 0).is_none());
703 }
704
705 #[test]
706 fn event_frame_varint_compact() {
707 let values = vec![FieldValue::Varint(1_050_000), FieldValue::Varint(3)];
708 let mut buf = Vec::new();
709 encode_event(WireTypeId(2), None, &values, &mut buf).unwrap();
710 assert!(
711 buf.len() <= 7,
712 "varint PollEnd should be <=7 bytes, got {}",
713 buf.len()
714 );
715
716 let tags: Vec<u8> = vec![FieldType::Varint as u8, FieldType::Varint as u8];
717 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
718 if id == WireTypeId(2) {
719 Some(SchemaInfo {
720 field_tags: &tags,
721 has_timestamp: false,
722 })
723 } else {
724 None
725 }
726 };
727 let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
728 assert_eq!(consumed, buf.len());
729 assert_eq!(
730 frame,
731 Frame::Event {
732 type_id: WireTypeId(2),
733 timestamp_ns: None,
734 values
735 }
736 );
737 }
738
739 #[test]
742 fn string_pool_round_trip() {
743 let entries = vec![
744 PoolEntry {
745 pool_id: 0,
746 data: b"main_thread".to_vec(),
747 },
748 PoolEntry {
749 pool_id: 1,
750 data: b"worker-1".to_vec(),
751 },
752 ];
753 let mut buf = Vec::new();
754 encode_string_pool(&entries, &mut buf).unwrap();
755 assert_eq!(buf[0], TAG_STRING_POOL);
756 let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
757 assert_eq!(consumed, buf.len());
758 assert_eq!(frame, Frame::StringPool(entries));
759 }
760
761 #[test]
762 fn string_pool_empty() {
763 let mut buf = Vec::new();
764 encode_string_pool(&[], &mut buf).unwrap();
765 let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
766 assert_eq!(frame, Frame::StringPool(vec![]));
767 }
768
769 #[test]
770 fn unknown_tag_returns_none() {
771 assert!(decode_frame(&[0xFF], |_| None, 0).is_none());
772 }
773
774 #[test]
775 fn truncated_event_frame() {
776 let tags: Vec<u8> = vec![FieldType::Varint as u8];
777 let data = [TAG_EVENT, 0x01];
778 let result = decode_frame(
779 &data,
780 |_| {
781 Some(SchemaInfo {
782 field_tags: &tags,
783 has_timestamp: false,
784 })
785 },
786 0,
787 );
788 assert!(result.is_none());
789 }
790
791 #[test]
792 fn truncated_schema_frame() {
793 let data = [TAG_SCHEMA, 0x00, 0x00];
794 let result = decode_frame(&data, |_: WireTypeId| None, 0);
795 assert!(result.is_none());
796 }
797}