vortex_ipc/messages/
encoder.rs1use bytes::{Bytes, BytesMut};
5use flatbuffers::FlatBufferBuilder;
6use vortex_array::serde::SerializeOptions;
7use vortex_array::{Array, ArrayContext};
8use vortex_buffer::ByteBuffer;
9use vortex_dtype::DType;
10use vortex_error::VortexExpect;
11use vortex_flatbuffers::{FlatBuffer, WriteFlatBufferExt, message as fb};
12
13pub enum EncoderMessage<'a> {
15 Array(&'a dyn Array),
16 Buffer(&'a ByteBuffer),
17 DType(&'a DType),
18}
19
20pub struct MessageEncoder {
21 zeros: Bytes,
23}
24
25impl Default for MessageEncoder {
26 fn default() -> Self {
27 Self {
28 zeros: BytesMut::zeroed(u16::MAX as usize).freeze(),
29 }
30 }
31}
32
33impl MessageEncoder {
34 pub fn encode(&mut self, message: EncoderMessage) -> Vec<Bytes> {
38 let mut buffers = vec![];
39
40 buffers.push(self.zeros.clone());
43 buffers.push(self.zeros.clone());
44
45 let mut fbb = FlatBufferBuilder::from_vec(vec![0u8; 4]);
50
51 let (header, body_len) = match message {
52 EncoderMessage::Array(array) => {
53 let ctx = ArrayContext::empty();
56 let array_buffers = array
57 .serialize(&ctx, &SerializeOptions::default())
58 .vortex_expect("Array serialization failed");
60 let body_len = array_buffers.iter().map(|b| b.len() as u64).sum::<u64>();
61
62 let array_encodings = ctx
63 .encodings()
64 .iter()
65 .map(|e| fbb.create_string(e.id().as_ref()))
66 .collect::<Vec<_>>();
67 let array_encodings = fbb.create_vector(array_encodings.as_slice());
68
69 let header = fb::ArrayMessage::create(
70 &mut fbb,
71 &fb::ArrayMessageArgs {
72 row_count: u32::try_from(array.len())
73 .vortex_expect("Array length must fit into u32"),
74 encodings: Some(array_encodings),
75 },
76 )
77 .as_union_value();
78
79 buffers.extend(array_buffers.into_iter().map(|b| b.into_inner()));
80
81 (header, body_len)
82 }
83 EncoderMessage::Buffer(buffer) => {
84 let header = fb::BufferMessage::create(
85 &mut fbb,
86 &fb::BufferMessageArgs {
87 alignment_exponent: buffer.alignment().exponent(),
88 },
89 )
90 .as_union_value();
91 let body_len = buffer.len() as u64;
92 buffers.push(buffer.clone().into_inner());
93
94 (header, body_len)
95 }
96 EncoderMessage::DType(dtype) => {
97 let header =
98 fb::DTypeMessage::create(&mut fbb, &fb::DTypeMessageArgs {}).as_union_value();
99
100 let buffer = dtype.write_flatbuffer_bytes().into_inner().into_inner();
101 let body_len = buffer.len() as u64;
102 buffers.push(buffer);
103
104 (header, body_len)
105 }
106 };
107
108 let mut msg = fb::MessageBuilder::new(&mut fbb);
109 msg.add_version(Default::default());
110 msg.add_header_type(match message {
111 EncoderMessage::Array(_) => fb::MessageHeader::ArrayMessage,
112 EncoderMessage::Buffer(_) => fb::MessageHeader::BufferMessage,
113 EncoderMessage::DType(_) => fb::MessageHeader::DTypeMessage,
114 });
115 msg.add_header(header);
116 msg.add_body_size(body_len);
117 let msg = msg.finish();
118
119 fbb.finish_minimal(msg);
121 let (fbv, pos) = fbb.collapse();
122 let fb_buffer = FlatBuffer::copy_from(&fbv[pos..]);
123 let fb_buffer_len = u32::try_from(fb_buffer.len())
124 .vortex_expect("IPC flatbuffer headers must fit into u32 bytes");
125
126 buffers[0] = Bytes::from(fb_buffer_len.to_le_bytes().to_vec());
127 buffers[1] = fb_buffer.into_inner().into_inner();
128
129 buffers
130 }
131}