1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use bytecheck::CheckBytes;
use ddshow_types::Event;
use rkyv::{
ser::{serializers::AlignedSerializer, Serializer},
validation::DefaultArchiveValidator,
AlignedVec, Serialize,
};
use std::{fmt::Debug, io::Write, marker::PhantomData, mem};
use timely::dataflow::operators::capture::event::{
Event as TimelyEvent, EventPusher as TimelyEventPusher,
};
#[derive(Debug)]
pub struct EventWriter<T, D, W> {
stream: W,
buffer: AlignedVec,
position: usize,
__type: PhantomData<(T, D)>,
}
impl<T, D, W> EventWriter<T, D, W> {
pub fn new(stream: W) -> Self {
EventWriter {
stream,
buffer: AlignedVec::with_capacity(512),
position: 0,
__type: PhantomData,
}
}
}
impl<T, D, W> TimelyEventPusher<T, D> for EventWriter<T, D, W>
where
W: Write,
T: for<'a> Serialize<AlignedSerializer<&'a mut AlignedVec>> + Debug,
T::Archived: CheckBytes<DefaultArchiveValidator>,
D: for<'a> Serialize<AlignedSerializer<&'a mut AlignedVec>> + Debug,
D::Archived: CheckBytes<DefaultArchiveValidator>,
{
fn push(&mut self, event: TimelyEvent<T, D>) {
let event: Event<T, D> = event.into();
const PADDING: [u8; 15] = [0; 15];
match self.position & 15 {
0 => (),
x => {
let padding = 16 - x;
if let Err(err) = self.stream.write_all(&PADDING[..padding]) {
#[cfg(feature = "tracing")]
tracing_dep::error!(
padding_len = padding,
"failed to write padding to stream: {:?}",
err,
);
return;
}
self.position += padding;
}
}
self.buffer.clear();
let mut serializer = AlignedSerializer::new(&mut self.buffer);
serializer
.serialize_value(&event)
.unwrap_or_else(|unreachable| match unreachable {});
let archive_len = serializer.pos() as u128;
let result = self
.stream
.write_all(&archive_len.to_le_bytes())
.and_then(|_| self.stream.write_all(&self.buffer));
if let Err(err) = result {
#[cfg(feature = "tracing")]
tracing_dep::error!(
archive_len = %archive_len,
"failed to write buffer data to stream: {:?}",
err,
);
return;
}
self.position += mem::size_of::<u128>() + archive_len as usize;
}
}