1use crate::schema::{FieldDef, SchemaEntry};
9use crate::types::{FieldType, FieldValue, FieldValueRef};
10use std::io::{self, Write};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
18pub struct WireTypeId(pub u16);
19
20pub(crate) const MAGIC: [u8; 4] = [0x54, 0x52, 0x43, 0x00]; pub(crate) const VERSION: u8 = 1;
22pub(crate) const HEADER_SIZE: usize = 5;
23
24pub(crate) const TAG_SCHEMA: u8 = 0x01;
25pub(crate) const TAG_EVENT: u8 = 0x02;
26pub(crate) const TAG_STRING_POOL: u8 = 0x03;
27pub(crate) const TAG_TIMESTAMP_RESET: u8 = 0x05;
29
30pub(crate) const MAX_TIMESTAMP_DELTA_NS: u64 = 0xFF_FFFF; #[inline]
35pub(crate) fn encode_u24_le(value: u32, w: &mut impl Write) -> io::Result<()> {
36 debug_assert!(value <= MAX_TIMESTAMP_DELTA_NS as u32);
37 w.write_all(&[value as u8, (value >> 8) as u8, (value >> 16) as u8])
38}
39
40#[inline]
42pub(crate) fn decode_u24_le(data: &[u8]) -> Option<u32> {
43 let b = data.get(..3)?;
44 Some(b[0] as u32 | (b[1] as u32) << 8 | (b[2] as u32) << 16)
45}
46
47#[derive(Debug, Clone, PartialEq)]
49pub struct PoolEntry {
50 pub pool_id: u32,
52 pub data: Vec<u8>,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57pub(crate) enum Frame {
58 Schema {
59 type_id: WireTypeId,
60 entry: SchemaEntry,
61 },
62 Event {
63 type_id: WireTypeId,
64 timestamp_ns: Option<u64>,
66 values: Vec<FieldValue>,
67 },
68 StringPool(Vec<PoolEntry>),
69 TimestampReset(u64),
70}
71
72#[non_exhaustive]
74#[derive(Debug, Clone, PartialEq)]
75pub struct PoolEntryRef<'a> {
76 pub pool_id: u32,
78 pub data: &'a [u8],
80}
81
82#[derive(Debug, Clone, PartialEq)]
84pub(crate) enum FrameRef<'a> {
85 Schema {
86 type_id: WireTypeId,
87 entry: SchemaEntry,
88 },
89 Event {
90 type_id: WireTypeId,
91 timestamp_ns: Option<u64>,
92 values: Vec<FieldValueRef<'a>>,
93 },
94 StringPool(Vec<PoolEntryRef<'a>>),
95 TimestampReset(u64),
96}
97
98pub(crate) struct SchemaInfo<'a> {
101 pub field_tags: &'a [u8],
102 pub has_timestamp: bool,
103}
104
105pub(crate) fn encode_header(w: &mut impl Write) -> io::Result<()> {
108 w.write_all(&MAGIC)?;
109 w.write_all(&[VERSION])
110}
111
112pub(crate) fn encode_schema(
113 type_id: WireTypeId,
114 entry: &SchemaEntry,
115 w: &mut impl Write,
116) -> io::Result<()> {
117 w.write_all(&[TAG_SCHEMA])?;
118 w.write_all(&type_id.0.to_le_bytes())?;
119 let name_bytes = entry.name.as_bytes();
120 w.write_all(&(name_bytes.len() as u16).to_le_bytes())?;
121 w.write_all(name_bytes)?;
122 w.write_all(&[if entry.has_timestamp { 1 } else { 0 }])?;
123 w.write_all(&(entry.fields.len() as u16).to_le_bytes())?;
124 for f in &entry.fields {
125 let fname = f.name.as_bytes();
126 w.write_all(&(fname.len() as u16).to_le_bytes())?;
127 w.write_all(fname)?;
128 w.write_all(&[f.field_type as u8])?;
129 }
130 Ok(())
131}
132
133#[cfg(test)]
136pub(crate) fn encode_event(
137 type_id: WireTypeId,
138 timestamp_delta_ns: Option<u32>,
139 values: &[FieldValue],
140 w: &mut impl Write,
141) -> io::Result<()> {
142 w.write_all(&[TAG_EVENT])?;
143 w.write_all(&type_id.0.to_le_bytes())?;
144 if let Some(delta) = timestamp_delta_ns {
145 encode_u24_le(delta, w)?;
146 }
147 for v in values {
148 v.encode(w)?;
149 }
150 Ok(())
151}
152
153pub(crate) fn encode_string_pool(entries: &[PoolEntry], w: &mut impl Write) -> io::Result<()> {
154 w.write_all(&[TAG_STRING_POOL])?;
155 w.write_all(&(entries.len() as u32).to_le_bytes())?;
156 for e in entries {
157 w.write_all(&e.pool_id.to_le_bytes())?;
158 w.write_all(&(e.data.len() as u32).to_le_bytes())?;
159 w.write_all(&e.data)?;
160 }
161 Ok(())
162}
163
164pub(crate) fn decode_header(data: &[u8]) -> Option<u8> {
167 if data.get(..4)? != MAGIC {
168 return None;
169 }
170 let version = *data.get(4)?;
171 Some(version)
172}
173
174pub(crate) fn decode_frame<'s>(
176 data: &[u8],
177 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
178 timestamp_base_ns: u64,
179) -> Option<(Frame, usize)> {
180 let tag = *data.first()?;
181 match tag {
182 TAG_SCHEMA => decode_schema_frame(data),
183 TAG_EVENT => decode_event_frame(data, schema_lookup, timestamp_base_ns),
184 TAG_STRING_POOL => decode_string_pool_frame(data),
185 TAG_TIMESTAMP_RESET => {
186 let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
187 Some((Frame::TimestampReset(ts), 9))
188 }
189 _ => None,
190 }
191}
192
193fn decode_schema_frame(data: &[u8]) -> Option<(Frame, usize)> {
194 let mut pos = 1; let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
196 pos += 2;
197 let name_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
198 pos += 2;
199 let name = String::from_utf8(data.get(pos..pos + name_len)?.to_vec()).ok()?;
200 pos += name_len;
201 let has_timestamp = *data.get(pos)? != 0;
202 pos += 1;
203 let field_count = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
204 pos += 2;
205 let mut fields = Vec::with_capacity(field_count);
206 for _ in 0..field_count {
207 let fname_len = u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?) as usize;
208 pos += 2;
209 let fname = String::from_utf8(data.get(pos..pos + fname_len)?.to_vec()).ok()?;
210 pos += fname_len;
211 let raw_tag = *data.get(pos)?;
212 let ft = FieldType::from_tag(raw_tag)?;
213 pos += 1;
214 fields.push(FieldDef {
215 name: fname,
216 field_type: ft,
217 });
218 }
219 Some((
220 Frame::Schema {
221 type_id,
222 entry: SchemaEntry {
223 name,
224 has_timestamp,
225 fields,
226 },
227 },
228 pos,
229 ))
230}
231
232fn decode_event_frame<'s>(
233 data: &[u8],
234 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
235 timestamp_base_ns: u64,
236) -> Option<(Frame, usize)> {
237 let mut pos = 1; let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
239 pos += 2;
240 let info = schema_lookup(type_id)?;
241
242 let timestamp_ns = if info.has_timestamp {
243 let delta = decode_u24_le(&data[pos..])?;
244 pos += 3;
245 Some(timestamp_base_ns.checked_add(delta as u64)?)
246 } else {
247 None
248 };
249
250 let mut values = Vec::with_capacity(info.field_tags.len());
251 let mut remaining = &data[pos..];
252 for &tag in info.field_tags {
253 let ft = FieldType::from_tag(tag)?;
254 if ft.is_optional() {
255 let prefix = *remaining.first()?;
256 remaining = &remaining[1..];
257 if prefix == 0x00 {
258 values.push(FieldValue::None);
259 } else {
260 let (val, rest) = FieldValue::decode(ft.inner(), remaining)?;
261 values.push(val);
262 remaining = rest;
263 }
264 } else {
265 let (val, rest) = FieldValue::decode(ft, remaining)?;
266 values.push(val);
267 remaining = rest;
268 }
269 }
270 let consumed = data.len() - remaining.len();
271 Some((
272 Frame::Event {
273 type_id,
274 timestamp_ns,
275 values,
276 },
277 consumed,
278 ))
279}
280
281fn decode_string_pool_frame(data: &[u8]) -> Option<(Frame, usize)> {
282 let mut pos = 1;
283 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
284 pos += 4;
285 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
286 for _ in 0..count {
287 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
288 pos += 4;
289 let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
290 pos += 4;
291 let d = data.get(pos..pos + len)?.to_vec();
292 pos += len;
293 entries.push(PoolEntry { pool_id, data: d });
294 }
295 Some((Frame::StringPool(entries), pos))
296}
297
298pub(crate) fn decode_frame_ref<'a, 's>(
302 data: &'a [u8],
303 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
304 timestamp_base_ns: u64,
305) -> Option<(FrameRef<'a>, usize)> {
306 let tag = *data.first()?;
307 match tag {
308 TAG_SCHEMA => {
309 let (frame, consumed) = decode_schema_frame(data)?;
310 match frame {
311 Frame::Schema { type_id, entry } => {
312 Some((FrameRef::Schema { type_id, entry }, consumed))
313 }
314 _ => unreachable!(),
315 }
316 }
317 TAG_EVENT => decode_event_frame_ref(data, schema_lookup, timestamp_base_ns),
318 TAG_STRING_POOL => decode_string_pool_frame_ref(data),
319 TAG_TIMESTAMP_RESET => {
320 let ts = u64::from_le_bytes(data.get(1..9)?.try_into().ok()?);
321 Some((FrameRef::TimestampReset(ts), 9))
322 }
323 _ => None,
324 }
325}
326
327fn decode_event_frame_ref<'a, 's>(
328 data: &'a [u8],
329 schema_lookup: impl Fn(WireTypeId) -> Option<SchemaInfo<'s>>,
330 timestamp_base_ns: u64,
331) -> Option<(FrameRef<'a>, usize)> {
332 let mut pos = 1;
333 let type_id = WireTypeId(u16::from_le_bytes(data.get(pos..pos + 2)?.try_into().ok()?));
334 pos += 2;
335 let info = schema_lookup(type_id)?;
336
337 let timestamp_ns = if info.has_timestamp {
338 let delta = decode_u24_le(&data[pos..])?;
339 pos += 3;
340 Some(timestamp_base_ns.checked_add(delta as u64)?)
341 } else {
342 None
343 };
344
345 let mut values = Vec::with_capacity(info.field_tags.len());
346 for &tag in info.field_tags {
347 let ft = FieldType::from_tag(tag)?;
348 if ft.is_optional() {
349 let prefix = *data.get(pos)?;
350 pos += 1;
351 if prefix == 0x00 {
352 values.push(FieldValueRef::None);
353 } else {
354 let (val, consumed) = FieldValueRef::decode(ft.inner(), data, pos)?;
355 values.push(val);
356 pos += consumed;
357 }
358 } else {
359 let (val, consumed) = FieldValueRef::decode(ft, data, pos)?;
360 values.push(val);
361 pos += consumed;
362 }
363 }
364 Some((
365 FrameRef::Event {
366 type_id,
367 timestamp_ns,
368 values,
369 },
370 pos,
371 ))
372}
373
374fn decode_string_pool_frame_ref<'a>(data: &'a [u8]) -> Option<(FrameRef<'a>, usize)> {
375 let mut pos = 1;
376 let count = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
377 pos += 4;
378 let mut entries = Vec::with_capacity(count.min((data.len() - pos) / 8));
379 for _ in 0..count {
380 let pool_id = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?);
381 pos += 4;
382 let len = u32::from_le_bytes(data.get(pos..pos + 4)?.try_into().ok()?) as usize;
383 pos += 4;
384 let d = data.get(pos..pos + len)?;
385 pos += len;
386 entries.push(PoolEntryRef { pool_id, data: d });
387 }
388 Some((FrameRef::StringPool(entries), pos))
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 #[test]
398 fn header_encode_decode() {
399 let mut buf = Vec::new();
400 encode_header(&mut buf).unwrap();
401 assert_eq!(buf, [0x54, 0x52, 0x43, 0x00, 1]);
402 assert_eq!(decode_header(&buf), Some(1));
403 }
404
405 #[test]
406 fn header_bad_magic() {
407 assert_eq!(decode_header(&[0x00, 0x00, 0x00, 0x00, 1]), None);
408 }
409
410 #[test]
411 fn header_too_short() {
412 assert_eq!(decode_header(&[0x54, 0x52]), None);
413 }
414
415 #[test]
418 fn schema_frame_round_trip() {
419 let type_id = WireTypeId(1);
420 let entry = SchemaEntry {
421 name: "PollStart".into(),
422 has_timestamp: true,
423 fields: vec![FieldDef {
424 name: "worker".into(),
425 field_type: FieldType::Varint,
426 }],
427 };
428 let mut buf = Vec::new();
429 encode_schema(type_id, &entry, &mut buf).unwrap();
430 assert_eq!(buf[0], TAG_SCHEMA);
431 let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
432 assert_eq!(consumed, buf.len());
433 assert_eq!(frame, Frame::Schema { type_id, entry });
434 }
435
436 #[test]
437 fn schema_frame_empty_fields() {
438 let type_id = WireTypeId(0);
439 let entry = SchemaEntry {
440 name: "Empty".into(),
441 has_timestamp: false,
442 fields: vec![],
443 };
444 let mut buf = Vec::new();
445 encode_schema(type_id, &entry, &mut buf).unwrap();
446 let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
447 assert_eq!(frame, Frame::Schema { type_id, entry });
448 }
449
450 #[test]
453 fn event_frame_round_trip() {
454 let values = vec![
455 FieldValue::Varint(12345),
456 FieldValue::Bool(true),
457 FieldValue::String("hi".to_string()),
458 ];
459 let mut buf = Vec::new();
460 encode_event(WireTypeId(1), None, &values, &mut buf).unwrap();
461 assert_eq!(buf[0], TAG_EVENT);
462
463 let tags: Vec<u8> = vec![
464 FieldType::Varint as u8,
465 FieldType::Bool as u8,
466 FieldType::String as u8,
467 ];
468 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
469 if id == WireTypeId(1) {
470 Some(SchemaInfo {
471 field_tags: &tags,
472 has_timestamp: false,
473 })
474 } else {
475 None
476 }
477 };
478 let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
479 assert_eq!(consumed, buf.len());
480 assert_eq!(
481 frame,
482 Frame::Event {
483 type_id: WireTypeId(1),
484 timestamp_ns: None,
485 values
486 }
487 );
488 }
489
490 #[test]
491 fn event_frame_with_timestamp_round_trip() {
492 let values = vec![FieldValue::Varint(42)];
493 let mut buf = Vec::new();
494 encode_event(WireTypeId(1), Some(1_000_000), &values, &mut buf).unwrap();
495
496 let tags: Vec<u8> = vec![FieldType::Varint as u8];
497 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
498 if id == WireTypeId(1) {
499 Some(SchemaInfo {
500 field_tags: &tags,
501 has_timestamp: true,
502 })
503 } else {
504 None
505 }
506 };
507 let (frame, consumed) = decode_frame(&buf, lookup, 5_000_000).unwrap();
508 assert_eq!(consumed, buf.len());
509 assert_eq!(
510 frame,
511 Frame::Event {
512 type_id: WireTypeId(1),
513 timestamp_ns: Some(5_000_000 + 1_000_000),
514 values,
515 }
516 );
517 }
518
519 #[test]
520 fn event_frame_unknown_type_id() {
521 let mut buf = Vec::new();
522 encode_event(WireTypeId(99), None, &[FieldValue::Varint(1)], &mut buf).unwrap();
523 assert!(decode_frame(&buf, |_| None, 0).is_none());
524 }
525
526 #[test]
527 fn event_frame_varint_compact() {
528 let values = vec![FieldValue::Varint(1_050_000), FieldValue::Varint(3)];
529 let mut buf = Vec::new();
530 encode_event(WireTypeId(2), None, &values, &mut buf).unwrap();
531 assert!(
532 buf.len() <= 7,
533 "varint PollEnd should be <=7 bytes, got {}",
534 buf.len()
535 );
536
537 let tags: Vec<u8> = vec![FieldType::Varint as u8, FieldType::Varint as u8];
538 let lookup = |id: WireTypeId| -> Option<SchemaInfo<'_>> {
539 if id == WireTypeId(2) {
540 Some(SchemaInfo {
541 field_tags: &tags,
542 has_timestamp: false,
543 })
544 } else {
545 None
546 }
547 };
548 let (frame, consumed) = decode_frame(&buf, lookup, 0).unwrap();
549 assert_eq!(consumed, buf.len());
550 assert_eq!(
551 frame,
552 Frame::Event {
553 type_id: WireTypeId(2),
554 timestamp_ns: None,
555 values
556 }
557 );
558 }
559
560 #[test]
563 fn string_pool_round_trip() {
564 let entries = vec![
565 PoolEntry {
566 pool_id: 0,
567 data: b"main_thread".to_vec(),
568 },
569 PoolEntry {
570 pool_id: 1,
571 data: b"worker-1".to_vec(),
572 },
573 ];
574 let mut buf = Vec::new();
575 encode_string_pool(&entries, &mut buf).unwrap();
576 assert_eq!(buf[0], TAG_STRING_POOL);
577 let (frame, consumed) = decode_frame(&buf, |_| None, 0).unwrap();
578 assert_eq!(consumed, buf.len());
579 assert_eq!(frame, Frame::StringPool(entries));
580 }
581
582 #[test]
583 fn string_pool_empty() {
584 let mut buf = Vec::new();
585 encode_string_pool(&[], &mut buf).unwrap();
586 let (frame, _) = decode_frame(&buf, |_| None, 0).unwrap();
587 assert_eq!(frame, Frame::StringPool(vec![]));
588 }
589
590 #[test]
591 fn unknown_tag_returns_none() {
592 assert!(decode_frame(&[0xFF], |_| None, 0).is_none());
593 }
594
595 #[test]
596 fn truncated_event_frame() {
597 let tags: Vec<u8> = vec![FieldType::Varint as u8];
598 let data = [TAG_EVENT, 0x01];
599 let result = decode_frame(
600 &data,
601 |_| {
602 Some(SchemaInfo {
603 field_tags: &tags,
604 has_timestamp: false,
605 })
606 },
607 0,
608 );
609 assert!(result.is_none());
610 }
611
612 #[test]
613 fn truncated_schema_frame() {
614 let data = [TAG_SCHEMA, 0x00, 0x00];
615 let result = decode_frame(&data, |_: WireTypeId| None, 0);
616 assert!(result.is_none());
617 }
618}