oxigdal_streaming/
arrow_ipc.rs1use crate::error::StreamingError;
13
14pub const ARROW_MAGIC: &[u8] = b"ARROW1";
16pub const ARROW_MAGIC_LEN: usize = 6;
18pub const ARROW_ALIGNMENT: usize = 8;
20
21#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum IpcMessageType {
26 Schema,
28 DictionaryBatch,
30 RecordBatch,
32 Tensor,
34 SparseTensor,
36}
37
38#[derive(Debug, Clone)]
42pub struct IpcMessageHeader {
43 pub message_type: IpcMessageType,
45 pub metadata_length: i32,
47 pub body_length: i64,
49 pub body_offset: u64,
51}
52
53#[derive(Debug, Clone)]
55pub struct IpcBuffer {
56 pub offset: i64,
58 pub length: i64,
60}
61
62#[derive(Debug, Clone)]
64pub struct IpcRecordBatch {
65 pub length: i64,
67 pub nodes: Vec<IpcFieldNode>,
69 pub buffers: Vec<IpcBuffer>,
71}
72
73#[derive(Debug, Clone)]
75pub struct IpcFieldNode {
76 pub length: i64,
78 pub null_count: i64,
80}
81
82pub struct ArrowIpcReader {
86 data: Vec<u8>,
87 offset: usize,
88}
89
90impl ArrowIpcReader {
91 #[must_use]
93 pub fn new(data: Vec<u8>) -> Self {
94 Self { data, offset: 0 }
95 }
96
97 #[must_use]
99 pub fn is_arrow_file(&self) -> bool {
100 self.data.len() >= ARROW_MAGIC_LEN && self.data.starts_with(ARROW_MAGIC)
101 }
102
103 fn read_i32(&self, offset: usize) -> Option<i32> {
104 let bytes = self.data.get(offset..offset + 4)?;
105 Some(i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
106 }
107
108 fn read_i64(&self, offset: usize) -> Option<i64> {
109 let bytes = self.data.get(offset..offset + 8)?;
110 Some(i64::from_le_bytes([
111 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
112 ]))
113 }
114
115 fn read_u32(&self, offset: usize) -> Option<u32> {
116 let bytes = self.data.get(offset..offset + 4)?;
117 Some(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
118 }
119
120 pub fn parse_file_header(&mut self) -> Result<(), StreamingError> {
125 if !self.is_arrow_file() {
126 return Err(StreamingError::Other("Not an Arrow IPC file".into()));
127 }
128 self.offset = ARROW_MAGIC_LEN + 2;
130 Ok(())
131 }
132
133 pub fn next_message(&mut self) -> Result<Option<IpcMessageHeader>, StreamingError> {
138 if self.offset + 4 > self.data.len() {
139 return Ok(None);
140 }
141
142 if let Some(cont) = self.read_u32(self.offset) {
144 if cont == 0xFFFF_FFFF {
145 self.offset += 4;
146 }
147 }
148
149 let metadata_length = self
151 .read_i32(self.offset)
152 .ok_or_else(|| StreamingError::Other("Truncated metadata length".into()))?;
153
154 if metadata_length <= 0 {
155 return Ok(None);
156 }
157 self.offset += 4;
158
159 let meta_end = self.offset + metadata_length as usize;
162 let msg_type = if meta_end <= self.data.len() && metadata_length >= 8 {
163 match self.data.get(self.offset + 4).copied().unwrap_or(0) {
164 1 => IpcMessageType::Schema,
165 2 => IpcMessageType::DictionaryBatch,
166 3 => IpcMessageType::RecordBatch,
167 4 => IpcMessageType::Tensor,
168 5 => IpcMessageType::SparseTensor,
169 _ => IpcMessageType::RecordBatch,
170 }
171 } else {
172 IpcMessageType::RecordBatch
173 };
174
175 let aligned_meta = align_to(metadata_length as usize, ARROW_ALIGNMENT);
177 self.offset += aligned_meta;
178
179 let body_length = self.read_i64(self.offset).unwrap_or(0);
181 self.offset += 8;
182
183 let body_offset = self.offset as u64;
184
185 let aligned_body = align_to(body_length as usize, ARROW_ALIGNMENT);
187 self.offset += aligned_body;
188
189 Ok(Some(IpcMessageHeader {
190 message_type: msg_type,
191 metadata_length,
192 body_length,
193 body_offset,
194 }))
195 }
196
197 #[must_use]
201 pub fn read_buffer<'a>(&'a self, body_offset: u64, buf: &IpcBuffer) -> Option<&'a [u8]> {
202 let start = (body_offset as usize).checked_add(buf.offset as usize)?;
203 let end = start.checked_add(buf.length as usize)?;
204 self.data.get(start..end)
205 }
206
207 #[must_use]
209 pub fn data_len(&self) -> usize {
210 self.data.len()
211 }
212
213 #[must_use]
215 pub fn current_offset(&self) -> usize {
216 self.offset
217 }
218}
219
220pub struct ArrowIpcWriter {
225 buf: Vec<u8>,
226}
227
228impl ArrowIpcWriter {
229 #[must_use]
231 pub fn new() -> Self {
232 let mut w = Self { buf: Vec::new() };
233 w.buf.extend_from_slice(ARROW_MAGIC);
234 w.buf.extend_from_slice(&[0u8; 2]); w
236 }
237
238 pub fn write_message(&mut self, metadata: &[u8], body: &[u8]) {
240 self.buf.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes());
242 self.buf
244 .extend_from_slice(&(metadata.len() as i32).to_le_bytes());
245 self.buf.extend_from_slice(metadata);
247 let meta_pad = align_to(metadata.len(), ARROW_ALIGNMENT) - metadata.len();
248 self.buf.resize(self.buf.len() + meta_pad, 0u8);
249 self.buf
251 .extend_from_slice(&(body.len() as i64).to_le_bytes());
252 self.buf.extend_from_slice(body);
254 let body_pad = align_to(body.len(), ARROW_ALIGNMENT) - body.len();
255 self.buf.resize(self.buf.len() + body_pad, 0u8);
256 }
257
258 #[must_use]
260 pub fn finish(mut self) -> Vec<u8> {
261 self.buf.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes());
263 self.buf.extend_from_slice(&0i32.to_le_bytes());
264 self.buf.extend_from_slice(ARROW_MAGIC);
266 self.buf
267 }
268}
269
270impl Default for ArrowIpcWriter {
271 fn default() -> Self {
272 Self::new()
273 }
274}
275
276#[must_use]
280pub fn align_to(size: usize, alignment: usize) -> usize {
281 if alignment == 0 {
282 return size;
283 }
284 (size + alignment - 1) & !(alignment - 1)
285}