Skip to main content

dial9_trace_format/
encoder.rs

1// High-level encoder API
2
3use crate::TraceEvent;
4use crate::codec::{self, PoolEntry, WireTypeId};
5use crate::schema::{SchemaEntry, SchemaRegistry};
6use crate::types::{EncodeState, EventEncoder, InternedString};
7use std::any::TypeId;
8use std::collections::HashMap;
9use std::io::{self, Write};
10use std::sync::Arc;
11
12/// A schema handle returned by [`Encoder::register_schema`] or created via
13/// [`Schema::new`].
14///
15/// Carries the full schema definition (name + fields) so it can auto-register
16/// itself with any encoder on first use. This means a `Schema` created on one
17/// encoder can be passed to a different encoder and it will just work.
18///
19/// `Schema` is cheap to clone (internally `Arc`-backed).
20#[derive(Clone, Debug)]
21pub struct Schema {
22    entry: Arc<SchemaEntry>,
23    /// Pre-computed `Arc<str>` of the schema name, used as a cheap HashMap key
24    /// (clone is a pointer bump instead of a String allocation).
25    name_key: Arc<str>,
26}
27
28impl Schema {
29    /// Create a schema handle without an encoder.
30    ///
31    /// The schema will be lazily registered the first time it is passed to
32    /// [`Encoder::write_event`].
33    pub fn new(name: &str, fields: Vec<crate::schema::FieldDef>) -> Self {
34        let name_key: Arc<str> = Arc::from(name);
35        Self {
36            entry: Arc::new(SchemaEntry {
37                name: name.to_string(),
38                has_timestamp: true,
39                fields,
40            }),
41            name_key,
42        }
43    }
44
45    /// Schema name.
46    pub fn name(&self) -> &str {
47        &self.entry.name
48    }
49
50    /// Schema field definitions.
51    pub fn fields(&self) -> &[crate::schema::FieldDef] {
52        &self.entry.fields
53    }
54}
55
56/// Key for schema lookup — either by name (manual registration) or by Rust
57/// `TypeId` (derive macro path).
58#[derive(Clone, PartialEq, Eq, Hash)]
59enum SchemaKey {
60    Name(Arc<str>),
61    RustType(TypeId),
62}
63
64pub struct Encoder<W: Write = Vec<u8>> {
65    state: EncodeState<W>,
66    registry: SchemaRegistry,
67    string_pool: HashMap<String, u32>,
68    next_pool_id: u32,
69    schema_ids: HashMap<SchemaKey, WireTypeId>,
70}
71
72impl Default for Encoder<Vec<u8>> {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl Encoder<Vec<u8>> {
79    pub fn new() -> Self {
80        let mut buf = Vec::new();
81        codec::encode_header(&mut buf).expect("Vec::write_all cannot fail");
82        Self {
83            state: EncodeState::new(buf),
84            registry: SchemaRegistry::new(),
85            string_pool: HashMap::new(),
86            next_pool_id: 0,
87            schema_ids: HashMap::new(),
88        }
89    }
90
91    /// Consume the encoder and return the encoded bytes.
92    pub fn finish(self) -> Vec<u8> {
93        self.state.writer.into_inner()
94    }
95}
96
97impl<W: Write> Encoder<W> {
98    /// Create an encoder that writes to an arbitrary writer.
99    /// Writes the file header immediately.
100    pub fn new_to(mut writer: W) -> io::Result<Self> {
101        codec::encode_header(&mut writer)?;
102        Ok(Self {
103            state: EncodeState::new(writer),
104            registry: SchemaRegistry::new(),
105            string_pool: HashMap::new(),
106            next_pool_id: 0,
107            schema_ids: HashMap::new(),
108        })
109    }
110
111    /// Create an encoder seeded from decoded state. Used by
112    /// [`Decoder::into_encoder`](crate::decoder::Decoder::into_encoder).
113    pub(crate) fn from_decoder(
114        mut registry: SchemaRegistry,
115        string_pool: crate::decoder::StringPool,
116        timestamp_base_ns: u64,
117        writer: W,
118    ) -> Self {
119        let mut pool = HashMap::new();
120        let mut next_pool_id: u32 = 0;
121        for (id, value) in string_pool.0.into_iter() {
122            pool.insert(value, id.raw_id());
123            if id.raw_id() >= next_pool_id {
124                next_pool_id = id.raw_id() + 1;
125            }
126        }
127
128        let mut schema_ids = HashMap::new();
129        for (wire_id, entry) in registry.entries() {
130            schema_ids.insert(SchemaKey::Name(Arc::from(entry.name.as_str())), wire_id);
131        }
132        registry.sync_next_id();
133
134        let mut state = EncodeState::new(writer);
135        state.timestamp_base_ns = timestamp_base_ns;
136
137        Self {
138            state,
139            registry,
140            string_pool: pool,
141            next_pool_id,
142            schema_ids,
143        }
144    }
145
146    /// Consume the encoder and return the inner writer.
147    pub fn into_inner(self) -> W {
148        self.state.writer.into_inner()
149    }
150
151    /// Borrow the inner writer.
152    pub fn as_inner(&self) -> &W {
153        self.state.writer.inner()
154    }
155
156    /// Total bytes written through this encoder (including the file header).
157    pub fn bytes_written(&self) -> u64 {
158        self.state.writer.bytes_written()
159    }
160
161    /// Ensure a schema is registered with this encoder. Returns the wire type
162    /// ID for this encoder's output stream.
163    ///
164    /// Idempotent if the schema matches. Errors if a different schema was
165    /// already registered under the same name.
166    fn ensure_registered(&mut self, schema: &Schema) -> io::Result<WireTypeId> {
167        let key = SchemaKey::Name(Arc::clone(&schema.name_key));
168        if let Some(&wire_id) = self.schema_ids.get(&key) {
169            let existing = self.registry.get(wire_id).unwrap();
170            if *existing == *schema.entry {
171                return Ok(wire_id);
172            }
173            return Err(io::Error::new(
174                io::ErrorKind::InvalidInput,
175                format!(
176                    "schema already registered with different definition: {}",
177                    schema.name()
178                ),
179            ));
180        }
181        let id = self.registry.next_type_id();
182        codec::encode_schema(id, &schema.entry, &mut self.state.writer)?;
183        self.registry
184            .register(id, (*schema.entry).clone())
185            .expect("schema registration failed");
186        self.schema_ids.insert(key, id);
187        Ok(id)
188    }
189
190    /// Register a schema by name. Returns a [`Schema`] handle that can be
191    /// passed to [`write_event`](Self::write_event) (on this or any other
192    /// encoder).
193    ///
194    /// All schemas have timestamps. When writing events, the first element of
195    /// `values` must be `FieldValue::Varint(timestamp_ns)`. It is extracted and
196    /// encoded in the event header (not as a regular field).
197    ///
198    /// Eagerly writes the schema frame. Idempotent if the definition matches.
199    pub fn register_schema(
200        &mut self,
201        name: &str,
202        fields: Vec<crate::schema::FieldDef>,
203    ) -> io::Result<Schema> {
204        let schema = Schema::new(name, fields);
205        self.ensure_registered(&schema)?;
206        Ok(schema)
207    }
208
209    /// Write an event for a schema.
210    ///
211    /// The first element of `values` must be `FieldValue::Varint(timestamp_ns)`
212    /// — it is extracted and encoded in the event header, not as a regular
213    /// field. The remaining values must match the schema's field count.
214    ///
215    /// If this encoder hasn't seen `schema` before, it is auto-registered
216    /// (the schema frame is written before the event).
217    pub fn write_event(
218        &mut self,
219        schema: &Schema,
220        values: &[crate::types::FieldValue],
221    ) -> io::Result<()> {
222        use crate::types::FieldValue;
223
224        let type_id = self.ensure_registered(schema)?;
225        let expected_fields = schema.entry.fields.len();
226
227        let ts_ns = match values.first() {
228            Some(FieldValue::Varint(ns)) => *ns,
229            _ => {
230                return Err(io::Error::new(
231                    io::ErrorKind::InvalidInput,
232                    "first value must be FieldValue::Varint(timestamp_ns)",
233                ));
234            }
235        };
236        let field_values = &values[1..];
237
238        if field_values.len() != expected_fields {
239            return Err(io::Error::new(
240                io::ErrorKind::InvalidInput,
241                format!(
242                    "value count ({}) does not match schema field count ({}) for schema '{}'",
243                    field_values.len(),
244                    expected_fields,
245                    schema.name(),
246                ),
247            ));
248        }
249
250        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
251        self.state.writer.write_all(&[codec::TAG_EVENT])?;
252        self.state.writer.write_all(&type_id.0.to_le_bytes())?;
253        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
254        let mut enc = EventEncoder::new(&mut self.state);
255        for v in field_values {
256            enc.write_field_value(v)?;
257        }
258        Ok(())
259    }
260
261    /// Write a derived TraceEvent. Auto-registers the schema on first call for this type.
262    /// Handles timestamp encoding: emits TimestampReset if needed, packs u24 delta in header.
263    pub fn write<T: TraceEvent + 'static>(&mut self, event: &T) -> io::Result<()> {
264        let key = SchemaKey::RustType(TypeId::of::<T>());
265        let tid = if let Some(&cached) = self.schema_ids.get(&key) {
266            cached
267        } else {
268            let entry = T::schema_entry();
269            let schema = Schema::new(&entry.name, entry.fields);
270            let id = self.ensure_registered(&schema)?;
271            self.schema_ids.insert(key, id);
272            id
273        };
274        let ts_ns = event.timestamp();
275        let ts_delta = self.state.encode_timestamp_delta(ts_ns)?;
276        self.state.writer.write_all(&[codec::TAG_EVENT])?;
277        self.state.writer.write_all(&tid.0.to_le_bytes())?;
278        codec::encode_u24_le(ts_delta, &mut self.state.writer)?;
279        let mut enc = EventEncoder::new(&mut self.state);
280        event.encode_fields(&mut enc)
281    }
282
283    /// Intern a string, emitting a pool frame if new. Returns an [`InternedString`] handle.
284    pub fn intern_string(&mut self, s: &str) -> io::Result<InternedString> {
285        if let Some(&id) = self.string_pool.get(s) {
286            return Ok(InternedString(id));
287        }
288        let id = self.next_pool_id;
289        self.next_pool_id += 1;
290        self.string_pool.insert(s.to_string(), id);
291        codec::encode_string_pool(
292            &[PoolEntry {
293                pool_id: id,
294                data: s.as_bytes().to_vec(),
295            }],
296            &mut self.state.writer,
297        )?;
298        Ok(InternedString(id))
299    }
300
301    pub fn write_string_pool(&mut self, entries: &[PoolEntry]) -> io::Result<()> {
302        codec::encode_string_pool(entries, &mut self.state.writer)
303    }
304
305    /// Flush the underlying writer.
306    pub fn flush(&mut self) -> io::Result<()> {
307        self.state.writer.flush()
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::schema::FieldDef;
315    use crate::types::{FieldType, FieldValue};
316
317    #[test]
318    fn encoder_writes_header() {
319        let enc = Encoder::new();
320        let data = enc.finish();
321        assert_eq!(&data[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
322    }
323
324    #[test]
325    fn encoder_register_and_write_event() {
326        let mut enc = Encoder::new();
327        let schema = enc
328            .register_schema(
329                "Ev",
330                vec![FieldDef {
331                    name: "v".into(),
332                    field_type: FieldType::Varint,
333                }],
334            )
335            .unwrap();
336        enc.write_event(
337            &schema,
338            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
339        )
340        .unwrap();
341        let data = enc.finish();
342        assert!(data.len() > 5);
343    }
344
345    #[test]
346    fn idempotent_re_registration() {
347        let mut enc = Encoder::new();
348        let fields = vec![FieldDef {
349            name: "v".into(),
350            field_type: FieldType::Varint,
351        }];
352        let _s1 = enc.register_schema("Ev", fields.clone()).unwrap();
353        let _s2 = enc.register_schema("Ev", fields).unwrap();
354        // Both succeed — same schema, same name
355    }
356
357    #[test]
358    fn re_registration_different_schema_errors() {
359        let mut enc = Encoder::new();
360        enc.register_schema(
361            "Ev",
362            vec![FieldDef {
363                name: "v".into(),
364                field_type: FieldType::Varint,
365            }],
366        )
367        .unwrap();
368        let result = enc.register_schema(
369            "Ev",
370            vec![FieldDef {
371                name: "different".into(),
372                field_type: FieldType::Bool,
373            }],
374        );
375        assert!(result.is_err());
376    }
377
378    #[test]
379    fn schema_auto_registers_on_write() {
380        use crate::decoder::{DecodedFrame, Decoder};
381
382        // Create a schema without an encoder
383        let schema = Schema::new(
384            "Lazy",
385            vec![FieldDef {
386                name: "v".into(),
387                field_type: FieldType::Varint,
388            }],
389        );
390
391        // Write to an encoder that hasn't seen this schema — auto-registers
392        let mut enc = Encoder::new();
393        enc.write_event(
394            &schema,
395            &[FieldValue::Varint(1_000), FieldValue::Varint(42)],
396        )
397        .unwrap();
398
399        let bytes = enc.finish();
400        let mut dec = Decoder::new(&bytes).unwrap();
401        let frames = dec.decode_all();
402        assert!(matches!(&frames[0], DecodedFrame::Schema(s) if s.name == "Lazy"));
403        if let DecodedFrame::Event { values, .. } = &frames[1] {
404            assert_eq!(*values, vec![FieldValue::Varint(42)]);
405        } else {
406            panic!("expected event");
407        }
408    }
409
410    #[test]
411    fn schema_portable_across_encoders() {
412        use crate::decoder::{DecodedFrame, Decoder};
413
414        let mut enc1 = Encoder::new();
415        let schema = enc1
416            .register_schema(
417                "Shared",
418                vec![FieldDef {
419                    name: "v".into(),
420                    field_type: FieldType::Varint,
421                }],
422            )
423            .unwrap();
424        enc1.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
425            .unwrap();
426
427        // Pass the same Schema to a different encoder
428        let mut enc2 = Encoder::new();
429        enc2.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
430            .unwrap();
431
432        // Both encoders produce valid output
433        for (enc, expected_val) in [(enc1, 1u64), (enc2, 2u64)] {
434            let bytes = enc.finish();
435            let mut dec = Decoder::new(&bytes).unwrap();
436            let frames = dec.decode_all();
437            let event = frames
438                .iter()
439                .find(|f| matches!(f, DecodedFrame::Event { .. }))
440                .unwrap();
441            if let DecodedFrame::Event { values, .. } = event {
442                assert_eq!(values[0], FieldValue::Varint(expected_val));
443            }
444        }
445    }
446
447    #[test]
448    fn encoder_intern_string_deduplicates() {
449        let mut enc = Encoder::new();
450        let id1 = enc.intern_string("hello").unwrap();
451        let id2 = enc.intern_string("hello").unwrap();
452        let id3 = enc.intern_string("world").unwrap();
453        assert_eq!(id1, id2);
454        assert_ne!(id1, id3);
455    }
456
457    #[test]
458    fn timestamp_round_trip() {
459        use crate::decoder::{DecodedFrame, Decoder};
460
461        let mut enc = Encoder::new();
462        let schema = enc
463            .register_schema(
464                "TS",
465                vec![FieldDef {
466                    name: "v".into(),
467                    field_type: FieldType::Varint,
468                }],
469            )
470            .unwrap();
471
472        let ts1 = 100_000u64;
473        let ts2 = 50_000u64;
474        let ts3 = 200_000_000u64;
475        let ts4 = 100_000_000u64;
476        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
477            .unwrap();
478        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
479            .unwrap();
480        enc.write_event(&schema, &[FieldValue::Varint(ts3), FieldValue::Varint(3)])
481            .unwrap();
482        enc.write_event(&schema, &[FieldValue::Varint(ts4), FieldValue::Varint(4)])
483            .unwrap();
484
485        let bytes = enc.finish();
486        let mut dec = Decoder::new(&bytes).unwrap();
487        let events: Vec<_> = dec
488            .decode_all()
489            .into_iter()
490            .filter_map(|f| match f {
491                DecodedFrame::Event {
492                    timestamp_ns,
493                    values,
494                    ..
495                } => Some((timestamp_ns, values)),
496                _ => None,
497            })
498            .collect();
499
500        assert_eq!(events.len(), 4);
501        assert_eq!(events[0].0, Some(ts1));
502        assert_eq!(events[0].1, vec![FieldValue::Varint(1)]);
503        assert_eq!(events[1].0, Some(ts2));
504        assert_eq!(events[1].1, vec![FieldValue::Varint(2)]);
505        assert_eq!(events[2].0, Some(ts3));
506        assert_eq!(events[2].1, vec![FieldValue::Varint(3)]);
507        assert_eq!(events[3].0, Some(ts4));
508        assert_eq!(events[3].1, vec![FieldValue::Varint(4)]);
509    }
510
511    #[test]
512    fn encoder_new_to_writer() {
513        let mut buf = Vec::new();
514        let enc = Encoder::new_to(&mut buf).unwrap();
515        drop(enc);
516        assert!(buf.len() >= 5);
517        assert_eq!(&buf[..5], &[0x54, 0x52, 0x43, 0x00, 1]);
518    }
519
520    #[test]
521    fn decoder_into_encoder_appends_without_header() {
522        use crate::decoder::{DecodedFrame, Decoder};
523
524        // Create a trace with a header, a schema, and an event
525        let mut enc = Encoder::new();
526        let schema = enc
527            .register_schema(
528                "Ev",
529                vec![FieldDef {
530                    name: "v".into(),
531                    field_type: FieldType::Varint,
532                }],
533            )
534            .unwrap();
535        enc.write_event(&schema, &[FieldValue::Varint(1_000), FieldValue::Varint(1)])
536            .unwrap();
537        let base = enc.finish();
538
539        // Decode all frames, then convert into an encoder that appends to output
540        let mut decoder = Decoder::new(&base).unwrap();
541        while decoder.next_frame_ref().ok().flatten().is_some() {}
542        let mut output = Vec::new();
543        let mut ext = decoder.into_encoder(&mut output);
544        // Schema "Ev" is already known — no duplicate schema frame emitted
545        ext.write_event(&schema, &[FieldValue::Varint(2_000), FieldValue::Varint(2)])
546            .unwrap();
547        drop(ext);
548
549        // Concatenate and decode
550        let mut combined = base.clone();
551        combined.extend_from_slice(&output);
552        let mut dec = Decoder::new(&combined).unwrap();
553        let events: Vec<_> = dec
554            .decode_all()
555            .into_iter()
556            .filter_map(|f| match f {
557                DecodedFrame::Event {
558                    timestamp_ns,
559                    values,
560                    ..
561                } => Some((timestamp_ns, values)),
562                _ => None,
563            })
564            .collect();
565        assert_eq!(events.len(), 2);
566        assert_eq!(events[0].0, Some(1_000));
567        assert_eq!(events[1].0, Some(2_000));
568    }
569
570    #[test]
571    fn decoder_into_encoder_deduplicates_interned_strings() {
572        use crate::decoder::{DecodedFrame, Decoder};
573
574        // Create a trace with an interned string
575        let mut enc = Encoder::new();
576        let id1 = enc.intern_string("hello").unwrap();
577        let base = enc.finish();
578
579        // Decode all frames, then convert into an encoder
580        let mut decoder = Decoder::new(&base).unwrap();
581        while decoder.next_frame_ref().ok().flatten().is_some() {}
582        let mut output = Vec::new();
583        let mut ext = decoder.into_encoder(&mut output);
584        // "hello" is already interned, should reuse the same ID
585        let id2 = ext.intern_string("hello").unwrap();
586        let id3 = ext.intern_string("world").unwrap();
587        drop(ext);
588
589        assert_eq!(id1, id2, "existing string should reuse pool ID");
590        assert_ne!(id2, id3);
591
592        // "hello" should not produce a new StringPool frame; "world" should
593        let mut combined = base.clone();
594        combined.extend_from_slice(&output);
595        let mut dec = Decoder::new(&combined).unwrap();
596        let frames = dec.decode_all();
597        let pool_frames: Vec<_> = frames
598            .iter()
599            .filter(|f| matches!(f, DecodedFrame::StringPool(_)))
600            .collect();
601        // One from the base trace ("hello"), one from extend ("world")
602        assert_eq!(pool_frames.len(), 2);
603    }
604
605    #[test]
606    fn register_and_write() {
607        use crate::decoder::{DecodedFrame, Decoder};
608
609        let mut enc = Encoder::new();
610        let schema = enc
611            .register_schema(
612                "MyEvent",
613                vec![
614                    FieldDef {
615                        name: "count".into(),
616                        field_type: FieldType::Varint,
617                    },
618                    FieldDef {
619                        name: "name".into(),
620                        field_type: FieldType::String,
621                    },
622                ],
623            )
624            .unwrap();
625
626        enc.write_event(
627            &schema,
628            &[
629                FieldValue::Varint(1_000_000),
630                FieldValue::Varint(42),
631                FieldValue::String("hello".into()),
632            ],
633        )
634        .unwrap();
635
636        let bytes = enc.finish();
637        let mut dec = Decoder::new(&bytes).unwrap();
638        let frames = dec.decode_all();
639        let events: Vec<_> = frames
640            .into_iter()
641            .filter_map(|f| match f {
642                DecodedFrame::Event {
643                    timestamp_ns,
644                    values,
645                    ..
646                } => Some((timestamp_ns, values)),
647                _ => None,
648            })
649            .collect();
650        assert_eq!(events.len(), 1);
651        assert_eq!(events[0].0, Some(1_000_000));
652        assert_eq!(events[0].1[0], FieldValue::Varint(42));
653        assert_eq!(events[0].1[1], FieldValue::String("hello".into()));
654    }
655
656    #[test]
657    fn register_conflict_errors() {
658        let mut enc = Encoder::new();
659        enc.register_schema(
660            "Ev",
661            vec![FieldDef {
662                name: "v".into(),
663                field_type: FieldType::Varint,
664            }],
665        )
666        .unwrap();
667        let result = enc.register_schema(
668            "Ev",
669            vec![FieldDef {
670                name: "other".into(),
671                field_type: FieldType::Bool,
672            }],
673        );
674        assert!(result.is_err());
675    }
676
677    #[test]
678    fn write_wrong_field_count_errors() {
679        let mut enc = Encoder::new();
680        let schema = enc
681            .register_schema(
682                "Ev",
683                vec![FieldDef {
684                    name: "v".into(),
685                    field_type: FieldType::Varint,
686                }],
687            )
688            .unwrap();
689        // Pass 3 values (ts + 2 fields) for a 1-field schema
690        let result = enc.write_event(
691            &schema,
692            &[
693                FieldValue::Varint(0),
694                FieldValue::Varint(1),
695                FieldValue::Varint(2),
696            ],
697        );
698        assert!(result.is_err());
699    }
700
701    /// Verify that the encoder advances the timestamp base after each event,
702    /// producing inter-event deltas rather than base-relative deltas.
703    #[test]
704    fn timestamp_base_advances_per_event() {
705        use crate::decoder::{DecodedFrame, Decoder};
706
707        let mut enc = Encoder::new();
708        let schema = enc
709            .register_schema(
710                "Ev",
711                vec![FieldDef {
712                    name: "v".into(),
713                    field_type: FieldType::Varint,
714                }],
715            )
716            .unwrap();
717
718        let ts1 = 12_000_000u64;
719        let ts2 = 24_000_000u64;
720        enc.write_event(&schema, &[FieldValue::Varint(ts1), FieldValue::Varint(1)])
721            .unwrap();
722        enc.write_event(&schema, &[FieldValue::Varint(ts2), FieldValue::Varint(2)])
723            .unwrap();
724
725        let bytes = enc.finish();
726
727        let reset_count = bytes.iter().filter(|&&b| b == 0x05).count();
728        assert_eq!(
729            reset_count, 0,
730            "base should advance per event, avoiding unnecessary resets"
731        );
732
733        let mut dec = Decoder::new(&bytes).unwrap();
734        let events: Vec<_> = dec
735            .decode_all()
736            .into_iter()
737            .filter_map(|f| match f {
738                DecodedFrame::Event { timestamp_ns, .. } => timestamp_ns,
739                _ => None,
740            })
741            .collect();
742        assert_eq!(events, vec![ts1, ts2]);
743    }
744}