vortex_ipc/messages/
encoder.rs1use bytes::Bytes;
5use bytes::BytesMut;
6use flatbuffers::FlatBufferBuilder;
7use vortex_array::Array;
8use vortex_array::ArrayContext;
9use vortex_array::serde::SerializeOptions;
10use vortex_buffer::ByteBuffer;
11use vortex_dtype::DType;
12use vortex_error::VortexExpect;
13use vortex_flatbuffers::FlatBuffer;
14use vortex_flatbuffers::WriteFlatBufferExt;
15use vortex_flatbuffers::message as fb;
16
17pub enum EncoderMessage<'a> {
19 Array(&'a dyn Array),
20 Buffer(&'a ByteBuffer),
21 DType(&'a DType),
22}
23
24pub struct MessageEncoder {
25 zeros: Bytes,
27}
28
29impl Default for MessageEncoder {
30 fn default() -> Self {
31 Self {
32 zeros: BytesMut::zeroed(u16::MAX as usize).freeze(),
33 }
34 }
35}
36
37impl MessageEncoder {
38 pub fn encode(&mut self, message: EncoderMessage) -> Vec<Bytes> {
42 let mut buffers = vec![];
43
44 buffers.push(self.zeros.clone());
47 buffers.push(self.zeros.clone());
48
49 let mut fbb = FlatBufferBuilder::from_vec(vec![0u8; 4]);
54
55 let (header, body_len) = match message {
56 EncoderMessage::Array(array) => {
57 let ctx = ArrayContext::empty();
60 let array_buffers = array
61 .serialize(&ctx, &SerializeOptions::default())
62 .vortex_expect("Array serialization failed");
64 let body_len = array_buffers.iter().map(|b| b.len() as u64).sum::<u64>();
65
66 let array_encodings = ctx
67 .encodings()
68 .iter()
69 .map(|e| fbb.create_string(e.id().as_ref()))
70 .collect::<Vec<_>>();
71 let array_encodings = fbb.create_vector(array_encodings.as_slice());
72
73 let header = fb::ArrayMessage::create(
74 &mut fbb,
75 &fb::ArrayMessageArgs {
76 row_count: u32::try_from(array.len())
77 .vortex_expect("Array length must fit into u32"),
78 encodings: Some(array_encodings),
79 },
80 )
81 .as_union_value();
82
83 buffers.extend(array_buffers.into_iter().map(|b| b.into_inner()));
84
85 (header, body_len)
86 }
87 EncoderMessage::Buffer(buffer) => {
88 let header = fb::BufferMessage::create(
89 &mut fbb,
90 &fb::BufferMessageArgs {
91 alignment_exponent: buffer.alignment().exponent(),
92 },
93 )
94 .as_union_value();
95 let body_len = buffer.len() as u64;
96 buffers.push(buffer.clone().into_inner());
97
98 (header, body_len)
99 }
100 EncoderMessage::DType(dtype) => {
101 let header =
102 fb::DTypeMessage::create(&mut fbb, &fb::DTypeMessageArgs {}).as_union_value();
103
104 let buffer = dtype.write_flatbuffer_bytes().into_inner().into_inner();
105 let body_len = buffer.len() as u64;
106 buffers.push(buffer);
107
108 (header, body_len)
109 }
110 };
111
112 let mut msg = fb::MessageBuilder::new(&mut fbb);
113 msg.add_version(Default::default());
114 msg.add_header_type(match message {
115 EncoderMessage::Array(_) => fb::MessageHeader::ArrayMessage,
116 EncoderMessage::Buffer(_) => fb::MessageHeader::BufferMessage,
117 EncoderMessage::DType(_) => fb::MessageHeader::DTypeMessage,
118 });
119 msg.add_header(header);
120 msg.add_body_size(body_len);
121 let msg = msg.finish();
122
123 fbb.finish_minimal(msg);
125 let (fbv, pos) = fbb.collapse();
126 let fb_buffer = FlatBuffer::copy_from(&fbv[pos..]);
127 let fb_buffer_len = u32::try_from(fb_buffer.len())
128 .vortex_expect("IPC flatbuffer headers must fit into u32 bytes");
129
130 buffers[0] = Bytes::from(fb_buffer_len.to_le_bytes().to_vec());
131 buffers[1] = fb_buffer.into_inner().into_inner();
132
133 buffers
134 }
135}