vortex_ipc/messages/
encoder.rs1use bytes::{Bytes, BytesMut};
2use flatbuffers::FlatBufferBuilder;
3use vortex_array::serde::SerializeOptions;
4use vortex_array::{Array, ArrayContext};
5use vortex_buffer::ByteBuffer;
6use vortex_dtype::DType;
7use vortex_error::VortexExpect;
8use vortex_flatbuffers::{FlatBuffer, WriteFlatBufferExt, message as fb};
9
10pub enum EncoderMessage<'a> {
12 Array(&'a dyn Array),
13 Buffer(&'a ByteBuffer),
14 DType(&'a DType),
15}
16
17pub struct MessageEncoder {
18 zeros: Bytes,
20}
21
22impl Default for MessageEncoder {
23 fn default() -> Self {
24 Self {
25 zeros: BytesMut::zeroed(u16::MAX as usize).freeze(),
26 }
27 }
28}
29
30impl MessageEncoder {
31 pub fn encode(&mut self, message: EncoderMessage) -> Vec<Bytes> {
35 let mut buffers = vec![];
36
37 buffers.push(self.zeros.clone());
40 buffers.push(self.zeros.clone());
41
42 let mut fbb = FlatBufferBuilder::from_vec(vec![0u8; 4]);
47
48 let (header, body_len) = match message {
49 EncoderMessage::Array(array) => {
50 let ctx = ArrayContext::empty();
53 let array_buffers = array
54 .serialize(&ctx, &SerializeOptions::default())
55 .vortex_expect("Array serialization failed");
57 let body_len = array_buffers.iter().map(|b| b.len() as u64).sum::<u64>();
58
59 let array_encodings = ctx
60 .encodings()
61 .iter()
62 .map(|e| fbb.create_string(e.id().as_ref()))
63 .collect::<Vec<_>>();
64 let array_encodings = fbb.create_vector(array_encodings.as_slice());
65
66 let header = fb::ArrayMessage::create(
67 &mut fbb,
68 &fb::ArrayMessageArgs {
69 row_count: u32::try_from(array.len())
70 .vortex_expect("Array length must fit into u32"),
71 encodings: Some(array_encodings),
72 },
73 )
74 .as_union_value();
75
76 buffers.extend(array_buffers.into_iter().map(|b| b.into_inner()));
77
78 (header, body_len)
79 }
80 EncoderMessage::Buffer(buffer) => {
81 let header = fb::BufferMessage::create(
82 &mut fbb,
83 &fb::BufferMessageArgs {
84 alignment_exponent: buffer.alignment().exponent(),
85 },
86 )
87 .as_union_value();
88 let body_len = buffer.len() as u64;
89 buffers.push(buffer.clone().into_inner());
90
91 (header, body_len)
92 }
93 EncoderMessage::DType(dtype) => {
94 let header =
95 fb::DTypeMessage::create(&mut fbb, &fb::DTypeMessageArgs {}).as_union_value();
96
97 let buffer = dtype.write_flatbuffer_bytes().into_inner().into_inner();
98 let body_len = buffer.len() as u64;
99 buffers.push(buffer);
100
101 (header, body_len)
102 }
103 };
104
105 let mut msg = fb::MessageBuilder::new(&mut fbb);
106 msg.add_version(Default::default());
107 msg.add_header_type(match message {
108 EncoderMessage::Array(_) => fb::MessageHeader::ArrayMessage,
109 EncoderMessage::Buffer(_) => fb::MessageHeader::BufferMessage,
110 EncoderMessage::DType(_) => fb::MessageHeader::DTypeMessage,
111 });
112 msg.add_header(header);
113 msg.add_body_size(body_len);
114 let msg = msg.finish();
115
116 fbb.finish_minimal(msg);
118 let (fbv, pos) = fbb.collapse();
119 let fb_buffer = FlatBuffer::copy_from(&fbv[pos..]);
120 let fb_buffer_len = u32::try_from(fb_buffer.len())
121 .vortex_expect("IPC flatbuffer headers must fit into u32 bytes");
122
123 buffers[0] = Bytes::from(fb_buffer_len.to_le_bytes().to_vec());
124 buffers[1] = fb_buffer.into_inner().into_inner();
125
126 buffers
127 }
128}