vortex_ipc/messages/
encoder.rs

1use bytes::{Bytes, BytesMut};
2use flatbuffers::FlatBufferBuilder;
3use vortex_array::serde::SerializeOptions;
4use vortex_array::{Array, ArrayContext};
5use vortex_buffer::ByteBuffer;
6use vortex_dtype::DType;
7use vortex_error::VortexExpect;
8use vortex_flatbuffers::{FlatBuffer, WriteFlatBufferExt, message as fb};
9
10/// An IPC message ready to be passed to the encoder.
11pub enum EncoderMessage<'a> {
12    Array(&'a dyn Array),
13    Buffer(&'a ByteBuffer),
14    DType(&'a DType),
15}
16
17pub struct MessageEncoder {
18    /// A reusable buffer of zeros used for padding.
19    zeros: Bytes,
20}
21
22impl Default for MessageEncoder {
23    fn default() -> Self {
24        Self {
25            zeros: BytesMut::zeroed(u16::MAX as usize).freeze(),
26        }
27    }
28}
29
30impl MessageEncoder {
31    /// Encode an IPC message for writing to a byte stream.
32    ///
33    /// The returned buffers should be written contiguously to the stream.
34    pub fn encode(&mut self, message: EncoderMessage) -> Vec<Bytes> {
35        let mut buffers = vec![];
36
37        // We'll push one buffer as a placeholder for the flatbuffer message length, and one
38        // for the flatbuffer itself.
39        buffers.push(self.zeros.clone());
40        buffers.push(self.zeros.clone());
41
42        // We initialize the flatbuffer builder with a 4-byte vector that we will use to store
43        // the flatbuffer length into. By passing this vector into the FlatBufferBuilder, the
44        // flatbuffers internal alignment mechanisms will handle everything else for us.
45        // TODO(ngates): again, this a ton of padding...
46        let mut fbb = FlatBufferBuilder::from_vec(vec![0u8; 4]);
47
48        let (header, body_len) = match message {
49            EncoderMessage::Array(array) => {
50                // Currently we include a Context in every message. We could convert this to
51                // sending deltas later.
52                let ctx = ArrayContext::empty();
53                let array_buffers = array
54                    .serialize(&ctx, &SerializeOptions::default())
55                    // TODO(ngates): we should propagate this somehow
56                    .vortex_expect("Array serialization failed");
57                let body_len = array_buffers.iter().map(|b| b.len() as u64).sum::<u64>();
58
59                let array_encodings = ctx
60                    .encodings()
61                    .iter()
62                    .map(|e| fbb.create_string(e.id().as_ref()))
63                    .collect::<Vec<_>>();
64                let array_encodings = fbb.create_vector(array_encodings.as_slice());
65
66                let header = fb::ArrayMessage::create(
67                    &mut fbb,
68                    &fb::ArrayMessageArgs {
69                        row_count: u32::try_from(array.len())
70                            .vortex_expect("Array length must fit into u32"),
71                        encodings: Some(array_encodings),
72                    },
73                )
74                .as_union_value();
75
76                buffers.extend(array_buffers.into_iter().map(|b| b.into_inner()));
77
78                (header, body_len)
79            }
80            EncoderMessage::Buffer(buffer) => {
81                let header = fb::BufferMessage::create(
82                    &mut fbb,
83                    &fb::BufferMessageArgs {
84                        alignment_exponent: buffer.alignment().exponent(),
85                    },
86                )
87                .as_union_value();
88                let body_len = buffer.len() as u64;
89                buffers.push(buffer.clone().into_inner());
90
91                (header, body_len)
92            }
93            EncoderMessage::DType(dtype) => {
94                let header =
95                    fb::DTypeMessage::create(&mut fbb, &fb::DTypeMessageArgs {}).as_union_value();
96
97                let buffer = dtype.write_flatbuffer_bytes().into_inner().into_inner();
98                let body_len = buffer.len() as u64;
99                buffers.push(buffer);
100
101                (header, body_len)
102            }
103        };
104
105        let mut msg = fb::MessageBuilder::new(&mut fbb);
106        msg.add_version(Default::default());
107        msg.add_header_type(match message {
108            EncoderMessage::Array(_) => fb::MessageHeader::ArrayMessage,
109            EncoderMessage::Buffer(_) => fb::MessageHeader::BufferMessage,
110            EncoderMessage::DType(_) => fb::MessageHeader::DTypeMessage,
111        });
112        msg.add_header(header);
113        msg.add_body_size(body_len);
114        let msg = msg.finish();
115
116        // Finish the flatbuffer and swap it out for the placeholder buffer.
117        fbb.finish_minimal(msg);
118        let (fbv, pos) = fbb.collapse();
119        let fb_buffer = FlatBuffer::copy_from(&fbv[pos..]);
120        let fb_buffer_len = u32::try_from(fb_buffer.len())
121            .vortex_expect("IPC flatbuffer headers must fit into u32 bytes");
122
123        buffers[0] = Bytes::from(fb_buffer_len.to_le_bytes().to_vec());
124        buffers[1] = fb_buffer.into_inner().into_inner();
125
126        buffers
127    }
128}