ddshow_sink/
writer.rs

1use bytecheck::CheckBytes;
2use ddshow_types::Event;
3use rkyv::{
4    ser::{
5        serializers::{
6            AlignedSerializer, AllocScratch, CompositeSerializer, FallbackScratch, HeapScratch,
7        },
8        Serializer,
9    },
10    validation::validators::DefaultValidator,
11    AlignedVec, Infallible, Serialize,
12};
13use std::{
14    fmt::{self, Debug},
15    io::Write,
16    marker::PhantomData,
17    mem,
18};
19use timely::dataflow::operators::capture::event::{
20    Event as TimelyEvent, EventPusher as TimelyEventPusher,
21};
22
23pub type EventSerializer<'a> = CompositeSerializer<
24    AlignedSerializer<&'a mut AlignedVec>,
25    FallbackScratch<HeapScratch<2048>, AllocScratch>,
26    Infallible,
27>;
28
29/// A wrapper for a writer that serializes [`rkyv`] encoded types that are FFI compatible
30pub struct EventWriter<T, D, W> {
31    stream: W,
32    buffer: AlignedVec,
33    position: usize,
34    scratch: FallbackScratch<HeapScratch<2048>, AllocScratch>,
35    __type: PhantomData<(T, D)>,
36}
37
38impl<T, D, W> EventWriter<T, D, W> {
39    /// Allocates a new `EventWriter` wrapping a supplied writer.
40    pub fn new(stream: W) -> Self {
41        EventWriter {
42            stream,
43            buffer: AlignedVec::with_capacity(512),
44            position: 0,
45            scratch: FallbackScratch::default(),
46            __type: PhantomData,
47        }
48    }
49}
50
51impl<T, D, W> TimelyEventPusher<T, D> for EventWriter<T, D, W>
52where
53    W: Write,
54    T: for<'a> Serialize<EventSerializer<'a>> + Debug,
55    T::Archived: for<'a> CheckBytes<DefaultValidator<'a>>,
56    D: for<'a> Serialize<EventSerializer<'a>> + Debug,
57    D::Archived: for<'a> CheckBytes<DefaultValidator<'a>>,
58{
59    fn push(&mut self, event: TimelyEvent<T, D>) {
60        let event: Event<T, D> = event.into();
61
62        // Align to 16
63        const PADDING: [u8; 15] = [0; 15];
64        match self.position & 15 {
65            0 => {}
66            x => {
67                let padding = 16 - x;
68
69                if let Err(err) = self.stream.write_all(&PADDING[..padding]) {
70                    #[cfg(feature = "tracing")]
71                    tracing_dep::error!(
72                        padding_len = padding,
73                        "failed to write padding to stream: {:?}",
74                        err,
75                    );
76
77                    return;
78                }
79
80                self.position += padding;
81            }
82        }
83
84        // Write archive
85        self.buffer.clear();
86
87        let mut serializer = CompositeSerializer::new(
88            AlignedSerializer::new(&mut self.buffer),
89            // FIXME: Get implementations to allow using `&mut`s within `CompositeSerializer`
90            mem::take(&mut self.scratch),
91            Infallible,
92        );
93
94        if let Err(err) = serializer.serialize_value(&event) {
95            #[cfg(feature = "tracing")]
96            tracing_dep::error!("failed to serialize event: {:?}", err);
97
98            return;
99        }
100
101        let archive_len = serializer.pos() as u128;
102        let (_, scratch, _) = serializer.into_components();
103        debug_assert_eq!(self.buffer.len(), archive_len as usize);
104        self.scratch = scratch;
105
106        let result = self
107            .stream
108            // This will keep 16-byte alignment because archive_len is a u128
109            .write_all(&archive_len.to_le_bytes())
110            .and_then(|()| {
111                self.position += mem::size_of::<u128>();
112                self.stream.write_all(&self.buffer)
113            })
114            .map(|()| self.position += archive_len as usize);
115
116        if let Err(err) = result {
117            #[cfg(feature = "tracing")]
118            tracing_dep::error!(
119                archive_len = %archive_len,
120                "failed to write buffer data to stream: {:?}",
121                err,
122            );
123        }
124    }
125}
126
127// FIXME: https://github.com/djkoloski/rkyv/issues/173
128impl<T, D, W> Debug for EventWriter<T, D, W> {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        f.debug_struct("EventWriter")
131            .field("stream", &(&self.stream as *const W))
132            .field("buffer", &self.buffer)
133            .field("position", &self.position)
134            .field("fallback", &(&self.scratch as *const _))
135            .finish()
136    }
137}