Skip to main content

vortex_ipc/messages/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use bytes::Bytes;
5use bytes::BytesMut;
6use flatbuffers::FlatBufferBuilder;
7use vortex_array::Array;
8use vortex_array::ArrayContext;
9use vortex_array::dtype::DType;
10use vortex_array::serde::SerializeOptions;
11use vortex_buffer::ByteBuffer;
12use vortex_error::VortexResult;
13use vortex_error::vortex_err;
14use vortex_flatbuffers::FlatBuffer;
15use vortex_flatbuffers::WriteFlatBufferExt;
16use vortex_flatbuffers::message as fb;
17
18/// An IPC message ready to be passed to the encoder.
19pub enum EncoderMessage<'a> {
20    Array(&'a dyn Array),
21    Buffer(&'a ByteBuffer),
22    DType(&'a DType),
23}
24
25pub struct MessageEncoder {
26    /// A reusable buffer of zeros used for padding.
27    zeros: Bytes,
28}
29
30impl Default for MessageEncoder {
31    fn default() -> Self {
32        Self {
33            zeros: BytesMut::zeroed(u16::MAX as usize).freeze(),
34        }
35    }
36}
37
38impl MessageEncoder {
39    /// Encode an IPC message for writing to a byte stream.
40    ///
41    /// The returned buffers should be written contiguously to the stream.
42    pub fn encode(&mut self, message: EncoderMessage) -> VortexResult<Vec<Bytes>> {
43        let mut buffers = vec![];
44
45        // We'll push one buffer as a placeholder for the flatbuffer message length, and one
46        // for the flatbuffer itself.
47        buffers.push(self.zeros.clone());
48        buffers.push(self.zeros.clone());
49
50        // We initialize the flatbuffer builder with a 4-byte vector that we will use to store
51        // the flatbuffer length into. By passing this vector into the FlatBufferBuilder, the
52        // flatbuffers internal alignment mechanisms will handle everything else for us.
53        // TODO(ngates): again, this a ton of padding...
54        let mut fbb = FlatBufferBuilder::from_vec(vec![0u8; 4]);
55
56        let (header, body_len) = match message {
57            EncoderMessage::Array(array) => {
58                // Currently we include a Context in every message. We could convert this to
59                // sending deltas later.
60                let ctx = ArrayContext::empty();
61
62                let array_buffers = array.serialize(&ctx, &SerializeOptions::default())?;
63                let body_len = array_buffers.iter().map(|b| b.len() as u64).sum::<u64>();
64
65                let array_encodings = ctx
66                    .to_ids()
67                    .iter()
68                    .map(|e| fbb.create_string(e.as_ref()))
69                    .collect::<Vec<_>>();
70                let array_encodings = fbb.create_vector(array_encodings.as_slice());
71
72                let header = fb::ArrayMessage::create(
73                    &mut fbb,
74                    &fb::ArrayMessageArgs {
75                        row_count: u32::try_from(array.len())
76                            .map_err(|_| vortex_err!("Array length must fit into u32"))?,
77                        encodings: Some(array_encodings),
78                    },
79                )
80                .as_union_value();
81
82                buffers.extend(array_buffers.into_iter().map(|b| b.into_inner()));
83
84                (header, body_len)
85            }
86            EncoderMessage::Buffer(buffer) => {
87                let header = fb::BufferMessage::create(
88                    &mut fbb,
89                    &fb::BufferMessageArgs {
90                        alignment_exponent: buffer.alignment().exponent(),
91                    },
92                )
93                .as_union_value();
94                let body_len = buffer.len() as u64;
95                buffers.push(buffer.clone().into_inner());
96
97                (header, body_len)
98            }
99            EncoderMessage::DType(dtype) => {
100                let header =
101                    fb::DTypeMessage::create(&mut fbb, &fb::DTypeMessageArgs {}).as_union_value();
102
103                let buffer = dtype.write_flatbuffer_bytes()?.into_inner().into_inner();
104                let body_len = buffer.len() as u64;
105                buffers.push(buffer);
106
107                (header, body_len)
108            }
109        };
110
111        let mut msg = fb::MessageBuilder::new(&mut fbb);
112        msg.add_version(Default::default());
113        msg.add_header_type(match message {
114            EncoderMessage::Array(_) => fb::MessageHeader::ArrayMessage,
115            EncoderMessage::Buffer(_) => fb::MessageHeader::BufferMessage,
116            EncoderMessage::DType(_) => fb::MessageHeader::DTypeMessage,
117        });
118        msg.add_header(header);
119        msg.add_body_size(body_len);
120        let msg = msg.finish();
121
122        // Finish the flatbuffer and swap it out for the placeholder buffer.
123        fbb.finish_minimal(msg);
124        let (fbv, pos) = fbb.collapse();
125        let fb_buffer = FlatBuffer::copy_from(&fbv[pos..]);
126        let fb_buffer_len = u32::try_from(fb_buffer.len())
127            .map_err(|_| vortex_err!("Array flatbuffer length must fit into u32"))?;
128
129        buffers[0] = Bytes::from(fb_buffer_len.to_le_bytes().to_vec());
130        buffers[1] = fb_buffer.into_inner().into_inner();
131
132        Ok(buffers)
133    }
134}