vortex_ipc/messages/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use bytes::{Bytes, BytesMut};
5use flatbuffers::FlatBufferBuilder;
6use vortex_array::serde::SerializeOptions;
7use vortex_array::{Array, ArrayContext};
8use vortex_buffer::ByteBuffer;
9use vortex_dtype::DType;
10use vortex_error::VortexExpect;
11use vortex_flatbuffers::{FlatBuffer, WriteFlatBufferExt, message as fb};
12
13/// An IPC message ready to be passed to the encoder.
14pub enum EncoderMessage<'a> {
15    Array(&'a dyn Array),
16    Buffer(&'a ByteBuffer),
17    DType(&'a DType),
18}
19
20pub struct MessageEncoder {
21    /// A reusable buffer of zeros used for padding.
22    zeros: Bytes,
23}
24
25impl Default for MessageEncoder {
26    fn default() -> Self {
27        Self {
28            zeros: BytesMut::zeroed(u16::MAX as usize).freeze(),
29        }
30    }
31}
32
33impl MessageEncoder {
34    /// Encode an IPC message for writing to a byte stream.
35    ///
36    /// The returned buffers should be written contiguously to the stream.
37    pub fn encode(&mut self, message: EncoderMessage) -> Vec<Bytes> {
38        let mut buffers = vec![];
39
40        // We'll push one buffer as a placeholder for the flatbuffer message length, and one
41        // for the flatbuffer itself.
42        buffers.push(self.zeros.clone());
43        buffers.push(self.zeros.clone());
44
45        // We initialize the flatbuffer builder with a 4-byte vector that we will use to store
46        // the flatbuffer length into. By passing this vector into the FlatBufferBuilder, the
47        // flatbuffers internal alignment mechanisms will handle everything else for us.
48        // TODO(ngates): again, this a ton of padding...
49        let mut fbb = FlatBufferBuilder::from_vec(vec![0u8; 4]);
50
51        let (header, body_len) = match message {
52            EncoderMessage::Array(array) => {
53                // Currently we include a Context in every message. We could convert this to
54                // sending deltas later.
55                let ctx = ArrayContext::empty();
56                let array_buffers = array
57                    .serialize(&ctx, &SerializeOptions::default())
58                    // TODO(ngates): we should propagate this somehow
59                    .vortex_expect("Array serialization failed");
60                let body_len = array_buffers.iter().map(|b| b.len() as u64).sum::<u64>();
61
62                let array_encodings = ctx
63                    .encodings()
64                    .iter()
65                    .map(|e| fbb.create_string(e.id().as_ref()))
66                    .collect::<Vec<_>>();
67                let array_encodings = fbb.create_vector(array_encodings.as_slice());
68
69                let header = fb::ArrayMessage::create(
70                    &mut fbb,
71                    &fb::ArrayMessageArgs {
72                        row_count: u32::try_from(array.len())
73                            .vortex_expect("Array length must fit into u32"),
74                        encodings: Some(array_encodings),
75                    },
76                )
77                .as_union_value();
78
79                buffers.extend(array_buffers.into_iter().map(|b| b.into_inner()));
80
81                (header, body_len)
82            }
83            EncoderMessage::Buffer(buffer) => {
84                let header = fb::BufferMessage::create(
85                    &mut fbb,
86                    &fb::BufferMessageArgs {
87                        alignment_exponent: buffer.alignment().exponent(),
88                    },
89                )
90                .as_union_value();
91                let body_len = buffer.len() as u64;
92                buffers.push(buffer.clone().into_inner());
93
94                (header, body_len)
95            }
96            EncoderMessage::DType(dtype) => {
97                let header =
98                    fb::DTypeMessage::create(&mut fbb, &fb::DTypeMessageArgs {}).as_union_value();
99
100                let buffer = dtype.write_flatbuffer_bytes().into_inner().into_inner();
101                let body_len = buffer.len() as u64;
102                buffers.push(buffer);
103
104                (header, body_len)
105            }
106        };
107
108        let mut msg = fb::MessageBuilder::new(&mut fbb);
109        msg.add_version(Default::default());
110        msg.add_header_type(match message {
111            EncoderMessage::Array(_) => fb::MessageHeader::ArrayMessage,
112            EncoderMessage::Buffer(_) => fb::MessageHeader::BufferMessage,
113            EncoderMessage::DType(_) => fb::MessageHeader::DTypeMessage,
114        });
115        msg.add_header(header);
116        msg.add_body_size(body_len);
117        let msg = msg.finish();
118
119        // Finish the flatbuffer and swap it out for the placeholder buffer.
120        fbb.finish_minimal(msg);
121        let (fbv, pos) = fbb.collapse();
122        let fb_buffer = FlatBuffer::copy_from(&fbv[pos..]);
123        let fb_buffer_len = u32::try_from(fb_buffer.len())
124            .vortex_expect("IPC flatbuffer headers must fit into u32 bytes");
125
126        buffers[0] = Bytes::from(fb_buffer_len.to_le_bytes().to_vec());
127        buffers[1] = fb_buffer.into_inner().into_inner();
128
129        buffers
130    }
131}