Skip to main content

oxigdal_streaming/
arrow_ipc.rs

1//! Zero-copy Arrow IPC framing for inter-process communication.
2//!
3//! Arrow IPC format (<https://arrow.apache.org/docs/format/IPC.html>):
4//! - **File format**: magic + schema + record batches + footer
5//! - **Stream format**: schema + record batches (no footer)
6//!
7//! This module implements the framing layer (message headers) to allow
8//! zero-copy deserialization.  Arrow arrays are described by offset+length
9//! pairs into the backing buffer, which enables zero-copy reads when the
10//! buffer is memory-mapped.
11
12use crate::error::StreamingError;
13
14/// Arrow IPC file magic bytes.
15pub const ARROW_MAGIC: &[u8] = b"ARROW1";
16/// Length of the magic sequence.
17pub const ARROW_MAGIC_LEN: usize = 6;
18/// Required alignment for Arrow IPC message bodies (bytes).
19pub const ARROW_ALIGNMENT: usize = 8;
20
21// ── Message types ─────────────────────────────────────────────────────────────
22
23/// Discriminant for Arrow IPC message payloads.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum IpcMessageType {
26    /// Schema message.
27    Schema,
28    /// Dictionary replacement batch.
29    DictionaryBatch,
30    /// Record batch.
31    RecordBatch,
32    /// Dense tensor.
33    Tensor,
34    /// Sparse tensor.
35    SparseTensor,
36}
37
38// ── Header & metadata structures ─────────────────────────────────────────────
39
40/// Parsed Arrow IPC message header (metadata only; body data is not copied).
41#[derive(Debug, Clone)]
42pub struct IpcMessageHeader {
43    /// Payload type discriminant.
44    pub message_type: IpcMessageType,
45    /// Length of the flatbuffer metadata section, in bytes.
46    pub metadata_length: i32,
47    /// Length of the binary body section, in bytes.
48    pub body_length: i64,
49    /// Absolute byte offset in the source buffer where the body starts.
50    pub body_offset: u64,
51}
52
53/// Arrow IPC buffer descriptor (offset + length within the message body).
54#[derive(Debug, Clone)]
55pub struct IpcBuffer {
56    /// Byte offset from the start of the message body.
57    pub offset: i64,
58    /// Number of bytes in this buffer.
59    pub length: i64,
60}
61
62/// Arrow IPC record batch metadata (no heap copies of array data).
63#[derive(Debug, Clone)]
64pub struct IpcRecordBatch {
65    /// Number of rows in this batch.
66    pub length: i64,
67    /// Per-column field node metadata.
68    pub nodes: Vec<IpcFieldNode>,
69    /// Buffer descriptors for all column buffers.
70    pub buffers: Vec<IpcBuffer>,
71}
72
73/// Metadata for one Arrow column node.
74#[derive(Debug, Clone)]
75pub struct IpcFieldNode {
76    /// Number of logical values in the column.
77    pub length: i64,
78    /// Number of null values.
79    pub null_count: i64,
80}
81
82// ── Reader ────────────────────────────────────────────────────────────────────
83
84/// Cursor-based Arrow IPC message reader.
85pub struct ArrowIpcReader {
86    data: Vec<u8>,
87    offset: usize,
88}
89
90impl ArrowIpcReader {
91    /// Creates a new reader wrapping `data`.
92    #[must_use]
93    pub fn new(data: Vec<u8>) -> Self {
94        Self { data, offset: 0 }
95    }
96
97    /// Returns `true` if `data` starts with the Arrow IPC magic bytes.
98    #[must_use]
99    pub fn is_arrow_file(&self) -> bool {
100        self.data.len() >= ARROW_MAGIC_LEN && self.data.starts_with(ARROW_MAGIC)
101    }
102
103    fn read_i32(&self, offset: usize) -> Option<i32> {
104        let bytes = self.data.get(offset..offset + 4)?;
105        Some(i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
106    }
107
108    fn read_i64(&self, offset: usize) -> Option<i64> {
109        let bytes = self.data.get(offset..offset + 8)?;
110        Some(i64::from_le_bytes([
111            bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
112        ]))
113    }
114
115    fn read_u32(&self, offset: usize) -> Option<u32> {
116        let bytes = self.data.get(offset..offset + 4)?;
117        Some(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
118    }
119
120    /// Validates and advances past the file header (magic + 2-byte padding).
121    ///
122    /// # Errors
123    /// Returns an error if the buffer does not start with the Arrow magic bytes.
124    pub fn parse_file_header(&mut self) -> Result<(), StreamingError> {
125        if !self.is_arrow_file() {
126            return Err(StreamingError::Other("Not an Arrow IPC file".into()));
127        }
128        // 6-byte magic + 2-byte padding
129        self.offset = ARROW_MAGIC_LEN + 2;
130        Ok(())
131    }
132
133    /// Reads and returns the next IPC message header, or `Ok(None)` at EOS.
134    ///
135    /// # Errors
136    /// Returns an error if the buffer is truncated mid-header.
137    pub fn next_message(&mut self) -> Result<Option<IpcMessageHeader>, StreamingError> {
138        if self.offset + 4 > self.data.len() {
139            return Ok(None);
140        }
141
142        // Optional continuation marker (0xFFFFFFFF).
143        if let Some(cont) = self.read_u32(self.offset) {
144            if cont == 0xFFFF_FFFF {
145                self.offset += 4;
146            }
147        }
148
149        // Metadata length (i32, LE).  Zero means EOS.
150        let metadata_length = self
151            .read_i32(self.offset)
152            .ok_or_else(|| StreamingError::Other("Truncated metadata length".into()))?;
153
154        if metadata_length <= 0 {
155            return Ok(None);
156        }
157        self.offset += 4;
158
159        // Infer the message type from the flatbuffer union tag at byte offset 4
160        // inside the flatbuffer (the union type field).
161        let meta_end = self.offset + metadata_length as usize;
162        let msg_type = if meta_end <= self.data.len() && metadata_length >= 8 {
163            match self.data.get(self.offset + 4).copied().unwrap_or(0) {
164                1 => IpcMessageType::Schema,
165                2 => IpcMessageType::DictionaryBatch,
166                3 => IpcMessageType::RecordBatch,
167                4 => IpcMessageType::Tensor,
168                5 => IpcMessageType::SparseTensor,
169                _ => IpcMessageType::RecordBatch,
170            }
171        } else {
172            IpcMessageType::RecordBatch
173        };
174
175        // Advance past metadata (aligned).
176        let aligned_meta = align_to(metadata_length as usize, ARROW_ALIGNMENT);
177        self.offset += aligned_meta;
178
179        // Body length (i64, LE) follows aligned metadata.
180        let body_length = self.read_i64(self.offset).unwrap_or(0);
181        self.offset += 8;
182
183        let body_offset = self.offset as u64;
184
185        // Advance past body (aligned).
186        let aligned_body = align_to(body_length as usize, ARROW_ALIGNMENT);
187        self.offset += aligned_body;
188
189        Ok(Some(IpcMessageHeader {
190            message_type: msg_type,
191            metadata_length,
192            body_length,
193            body_offset,
194        }))
195    }
196
197    /// Returns the slice for an [`IpcBuffer`] relative to `body_offset`.
198    ///
199    /// Returns `None` if the range falls outside the backing buffer.
200    #[must_use]
201    pub fn read_buffer<'a>(&'a self, body_offset: u64, buf: &IpcBuffer) -> Option<&'a [u8]> {
202        let start = (body_offset as usize).checked_add(buf.offset as usize)?;
203        let end = start.checked_add(buf.length as usize)?;
204        self.data.get(start..end)
205    }
206
207    /// Returns the total length of the backing buffer.
208    #[must_use]
209    pub fn data_len(&self) -> usize {
210        self.data.len()
211    }
212
213    /// Returns the current read cursor position.
214    #[must_use]
215    pub fn current_offset(&self) -> usize {
216        self.offset
217    }
218}
219
220// ── Writer ────────────────────────────────────────────────────────────────────
221
222/// Framing-layer Arrow IPC writer.  Serialises message headers and bodies
223/// without depending on the full Arrow crate serialiser.
224pub struct ArrowIpcWriter {
225    buf: Vec<u8>,
226}
227
228impl ArrowIpcWriter {
229    /// Creates a new writer and writes the Arrow file magic header.
230    #[must_use]
231    pub fn new() -> Self {
232        let mut w = Self { buf: Vec::new() };
233        w.buf.extend_from_slice(ARROW_MAGIC);
234        w.buf.extend_from_slice(&[0u8; 2]); // 2-byte padding
235        w
236    }
237
238    /// Appends a framed IPC message (metadata + body) to the internal buffer.
239    pub fn write_message(&mut self, metadata: &[u8], body: &[u8]) {
240        // Continuation token.
241        self.buf.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes());
242        // Metadata length.
243        self.buf
244            .extend_from_slice(&(metadata.len() as i32).to_le_bytes());
245        // Metadata + alignment padding.
246        self.buf.extend_from_slice(metadata);
247        let meta_pad = align_to(metadata.len(), ARROW_ALIGNMENT) - metadata.len();
248        self.buf.resize(self.buf.len() + meta_pad, 0u8);
249        // Body length.
250        self.buf
251            .extend_from_slice(&(body.len() as i64).to_le_bytes());
252        // Body + alignment padding.
253        self.buf.extend_from_slice(body);
254        let body_pad = align_to(body.len(), ARROW_ALIGNMENT) - body.len();
255        self.buf.resize(self.buf.len() + body_pad, 0u8);
256    }
257
258    /// Writes the EOS marker and trailing magic, then returns the finished buffer.
259    #[must_use]
260    pub fn finish(mut self) -> Vec<u8> {
261        // EOS: continuation + zero metadata length.
262        self.buf.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes());
263        self.buf.extend_from_slice(&0i32.to_le_bytes());
264        // Trailing magic.
265        self.buf.extend_from_slice(ARROW_MAGIC);
266        self.buf
267    }
268}
269
270impl Default for ArrowIpcWriter {
271    fn default() -> Self {
272        Self::new()
273    }
274}
275
276// ── Helpers ───────────────────────────────────────────────────────────────────
277
278/// Rounds `size` up to the next multiple of `alignment`.
279#[must_use]
280pub fn align_to(size: usize, alignment: usize) -> usize {
281    if alignment == 0 {
282        return size;
283    }
284    (size + alignment - 1) & !(alignment - 1)
285}