vortex_ipc/messages/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use bytes::Bytes;
5use bytes::BytesMut;
6use flatbuffers::FlatBufferBuilder;
7use vortex_array::Array;
8use vortex_array::ArrayContext;
9use vortex_array::serde::SerializeOptions;
10use vortex_buffer::ByteBuffer;
11use vortex_dtype::DType;
12use vortex_error::VortexExpect;
13use vortex_flatbuffers::FlatBuffer;
14use vortex_flatbuffers::WriteFlatBufferExt;
15use vortex_flatbuffers::message as fb;
16
17/// An IPC message ready to be passed to the encoder.
18pub enum EncoderMessage<'a> {
19    Array(&'a dyn Array),
20    Buffer(&'a ByteBuffer),
21    DType(&'a DType),
22}
23
24pub struct MessageEncoder {
25    /// A reusable buffer of zeros used for padding.
26    zeros: Bytes,
27}
28
29impl Default for MessageEncoder {
30    fn default() -> Self {
31        Self {
32            zeros: BytesMut::zeroed(u16::MAX as usize).freeze(),
33        }
34    }
35}
36
37impl MessageEncoder {
38    /// Encode an IPC message for writing to a byte stream.
39    ///
40    /// The returned buffers should be written contiguously to the stream.
41    pub fn encode(&mut self, message: EncoderMessage) -> Vec<Bytes> {
42        let mut buffers = vec![];
43
44        // We'll push one buffer as a placeholder for the flatbuffer message length, and one
45        // for the flatbuffer itself.
46        buffers.push(self.zeros.clone());
47        buffers.push(self.zeros.clone());
48
49        // We initialize the flatbuffer builder with a 4-byte vector that we will use to store
50        // the flatbuffer length into. By passing this vector into the FlatBufferBuilder, the
51        // flatbuffers internal alignment mechanisms will handle everything else for us.
52        // TODO(ngates): again, this a ton of padding...
53        let mut fbb = FlatBufferBuilder::from_vec(vec![0u8; 4]);
54
55        let (header, body_len) = match message {
56            EncoderMessage::Array(array) => {
57                // Currently we include a Context in every message. We could convert this to
58                // sending deltas later.
59                let ctx = ArrayContext::empty();
60                let array_buffers = array
61                    .serialize(&ctx, &SerializeOptions::default())
62                    // TODO(ngates): we should propagate this somehow
63                    .vortex_expect("Array serialization failed");
64                let body_len = array_buffers.iter().map(|b| b.len() as u64).sum::<u64>();
65
66                let array_encodings = ctx
67                    .encodings()
68                    .iter()
69                    .map(|e| fbb.create_string(e.id().as_ref()))
70                    .collect::<Vec<_>>();
71                let array_encodings = fbb.create_vector(array_encodings.as_slice());
72
73                let header = fb::ArrayMessage::create(
74                    &mut fbb,
75                    &fb::ArrayMessageArgs {
76                        row_count: u32::try_from(array.len())
77                            .vortex_expect("Array length must fit into u32"),
78                        encodings: Some(array_encodings),
79                    },
80                )
81                .as_union_value();
82
83                buffers.extend(array_buffers.into_iter().map(|b| b.into_inner()));
84
85                (header, body_len)
86            }
87            EncoderMessage::Buffer(buffer) => {
88                let header = fb::BufferMessage::create(
89                    &mut fbb,
90                    &fb::BufferMessageArgs {
91                        alignment_exponent: buffer.alignment().exponent(),
92                    },
93                )
94                .as_union_value();
95                let body_len = buffer.len() as u64;
96                buffers.push(buffer.clone().into_inner());
97
98                (header, body_len)
99            }
100            EncoderMessage::DType(dtype) => {
101                let header =
102                    fb::DTypeMessage::create(&mut fbb, &fb::DTypeMessageArgs {}).as_union_value();
103
104                let buffer = dtype.write_flatbuffer_bytes().into_inner().into_inner();
105                let body_len = buffer.len() as u64;
106                buffers.push(buffer);
107
108                (header, body_len)
109            }
110        };
111
112        let mut msg = fb::MessageBuilder::new(&mut fbb);
113        msg.add_version(Default::default());
114        msg.add_header_type(match message {
115            EncoderMessage::Array(_) => fb::MessageHeader::ArrayMessage,
116            EncoderMessage::Buffer(_) => fb::MessageHeader::BufferMessage,
117            EncoderMessage::DType(_) => fb::MessageHeader::DTypeMessage,
118        });
119        msg.add_header(header);
120        msg.add_body_size(body_len);
121        let msg = msg.finish();
122
123        // Finish the flatbuffer and swap it out for the placeholder buffer.
124        fbb.finish_minimal(msg);
125        let (fbv, pos) = fbb.collapse();
126        let fb_buffer = FlatBuffer::copy_from(&fbv[pos..]);
127        let fb_buffer_len = u32::try_from(fb_buffer.len())
128            .vortex_expect("IPC flatbuffer headers must fit into u32 bytes");
129
130        buffers[0] = Bytes::from(fb_buffer_len.to_le_bytes().to_vec());
131        buffers[1] = fb_buffer.into_inner().into_inner();
132
133        buffers
134    }
135}