1use bytecheck::CheckBytes;
2use ddshow_types::Event;
3use rkyv::{
4 ser::{
5 serializers::{
6 AlignedSerializer, AllocScratch, CompositeSerializer, FallbackScratch, HeapScratch,
7 },
8 Serializer,
9 },
10 validation::validators::DefaultValidator,
11 AlignedVec, Infallible, Serialize,
12};
13use std::{
14 fmt::{self, Debug},
15 io::Write,
16 marker::PhantomData,
17 mem,
18};
19use timely::dataflow::operators::capture::event::{
20 Event as TimelyEvent, EventPusher as TimelyEventPusher,
21};
22
23pub type EventSerializer<'a> = CompositeSerializer<
24 AlignedSerializer<&'a mut AlignedVec>,
25 FallbackScratch<HeapScratch<2048>, AllocScratch>,
26 Infallible,
27>;
28
29pub struct EventWriter<T, D, W> {
31 stream: W,
32 buffer: AlignedVec,
33 position: usize,
34 scratch: FallbackScratch<HeapScratch<2048>, AllocScratch>,
35 __type: PhantomData<(T, D)>,
36}
37
38impl<T, D, W> EventWriter<T, D, W> {
39 pub fn new(stream: W) -> Self {
41 EventWriter {
42 stream,
43 buffer: AlignedVec::with_capacity(512),
44 position: 0,
45 scratch: FallbackScratch::default(),
46 __type: PhantomData,
47 }
48 }
49}
50
51impl<T, D, W> TimelyEventPusher<T, D> for EventWriter<T, D, W>
52where
53 W: Write,
54 T: for<'a> Serialize<EventSerializer<'a>> + Debug,
55 T::Archived: for<'a> CheckBytes<DefaultValidator<'a>>,
56 D: for<'a> Serialize<EventSerializer<'a>> + Debug,
57 D::Archived: for<'a> CheckBytes<DefaultValidator<'a>>,
58{
59 fn push(&mut self, event: TimelyEvent<T, D>) {
60 let event: Event<T, D> = event.into();
61
62 const PADDING: [u8; 15] = [0; 15];
64 match self.position & 15 {
65 0 => {}
66 x => {
67 let padding = 16 - x;
68
69 if let Err(err) = self.stream.write_all(&PADDING[..padding]) {
70 #[cfg(feature = "tracing")]
71 tracing_dep::error!(
72 padding_len = padding,
73 "failed to write padding to stream: {:?}",
74 err,
75 );
76
77 return;
78 }
79
80 self.position += padding;
81 }
82 }
83
84 self.buffer.clear();
86
87 let mut serializer = CompositeSerializer::new(
88 AlignedSerializer::new(&mut self.buffer),
89 mem::take(&mut self.scratch),
91 Infallible,
92 );
93
94 if let Err(err) = serializer.serialize_value(&event) {
95 #[cfg(feature = "tracing")]
96 tracing_dep::error!("failed to serialize event: {:?}", err);
97
98 return;
99 }
100
101 let archive_len = serializer.pos() as u128;
102 let (_, scratch, _) = serializer.into_components();
103 debug_assert_eq!(self.buffer.len(), archive_len as usize);
104 self.scratch = scratch;
105
106 let result = self
107 .stream
108 .write_all(&archive_len.to_le_bytes())
110 .and_then(|()| {
111 self.position += mem::size_of::<u128>();
112 self.stream.write_all(&self.buffer)
113 })
114 .map(|()| self.position += archive_len as usize);
115
116 if let Err(err) = result {
117 #[cfg(feature = "tracing")]
118 tracing_dep::error!(
119 archive_len = %archive_len,
120 "failed to write buffer data to stream: {:?}",
121 err,
122 );
123 }
124 }
125}
126
127impl<T, D, W> Debug for EventWriter<T, D, W> {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("EventWriter")
131 .field("stream", &(&self.stream as *const W))
132 .field("buffer", &self.buffer)
133 .field("position", &self.position)
134 .field("fallback", &(&self.scratch as *const _))
135 .finish()
136 }
137}