Skip to main content

scirs2_core/arrow_compat/
ipc.rs

1//! Arrow IPC (Inter-Process Communication) support
2//!
3//! Provides serialization/deserialization of Arrow arrays and RecordBatches
4//! to/from the Arrow IPC format for cross-process data sharing.
5//!
6//! Supports:
7//! - Streaming IPC format (for sequential writes)
8//! - File IPC format (for random access)
9//! - Memory-mapped IPC files (for shared memory access)
10
11use super::error::{ArrowCompatError, ArrowResult};
12use arrow::array::ArrayRef;
13use arrow::datatypes::SchemaRef;
14use arrow::ipc::reader::{FileReader, StreamReader};
15use arrow::ipc::writer::{FileWriter, StreamWriter};
16use arrow::record_batch::RecordBatch;
17use std::fs::File;
18use std::io::{BufReader, BufWriter, Cursor, Read, Seek, Write};
19use std::path::Path;
20
21// =============================================================================
22// In-memory IPC serialization (bytes)
23// =============================================================================
24
25/// Serialize a `RecordBatch` to Arrow IPC streaming format (bytes)
26///
27/// The streaming format is suitable for sequential reads and
28/// cross-process communication via pipes or sockets.
29///
30/// # Examples
31///
32/// ```rust
33/// # use scirs2_core::arrow_compat::ipc::{record_batch_to_ipc_stream, ipc_stream_to_record_batches};
34/// # use scirs2_core::arrow_compat::conversions::array2_to_record_batch;
35/// # use ndarray::Array2;
36/// let arr = Array2::from_shape_vec((3, 2), vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0])
37///     .expect("shape error");
38/// let batch = array2_to_record_batch(&arr, None).expect("conversion failed");
39/// let bytes = record_batch_to_ipc_stream(&[batch]).expect("serialization failed");
40/// let recovered = ipc_stream_to_record_batches(&bytes).expect("deserialization failed");
41/// assert_eq!(recovered.len(), 1);
42/// assert_eq!(recovered[0].num_rows(), 3);
43/// ```
44pub fn record_batch_to_ipc_stream(batches: &[RecordBatch]) -> ArrowResult<Vec<u8>> {
45    if batches.is_empty() {
46        return Err(ArrowCompatError::SchemaError(
47            "No record batches to serialize".to_string(),
48        ));
49    }
50
51    let schema = batches[0].schema();
52    let mut buffer = Vec::new();
53
54    {
55        let mut writer = StreamWriter::try_new(&mut buffer, &schema)?;
56        for batch in batches {
57            writer.write(batch)?;
58        }
59        writer.finish()?;
60    }
61
62    Ok(buffer)
63}
64
65/// Deserialize `RecordBatch` instances from Arrow IPC streaming format (bytes)
66pub fn ipc_stream_to_record_batches(data: &[u8]) -> ArrowResult<Vec<RecordBatch>> {
67    let cursor = Cursor::new(data);
68    let reader = StreamReader::try_new(cursor, None)?;
69
70    let mut batches = Vec::new();
71    for batch_result in reader {
72        let batch = batch_result?;
73        batches.push(batch);
74    }
75
76    Ok(batches)
77}
78
79/// Serialize a `RecordBatch` to Arrow IPC file format (bytes)
80///
81/// The file format includes a footer for random access to record batches,
82/// making it suitable for memory-mapped access and random reads.
83pub fn record_batch_to_ipc_file(batches: &[RecordBatch]) -> ArrowResult<Vec<u8>> {
84    if batches.is_empty() {
85        return Err(ArrowCompatError::SchemaError(
86            "No record batches to serialize".to_string(),
87        ));
88    }
89
90    let schema = batches[0].schema();
91    let mut buffer = Vec::new();
92
93    {
94        let mut writer = FileWriter::try_new(&mut buffer, &schema)?;
95        for batch in batches {
96            writer.write(batch)?;
97        }
98        writer.finish()?;
99    }
100
101    Ok(buffer)
102}
103
104/// Deserialize `RecordBatch` instances from Arrow IPC file format (bytes)
105pub fn ipc_file_to_record_batches(data: &[u8]) -> ArrowResult<Vec<RecordBatch>> {
106    let cursor = Cursor::new(data.to_vec());
107    let reader = FileReader::try_new(cursor, None)?;
108
109    let mut batches = Vec::new();
110    for batch_result in reader {
111        let batch = batch_result?;
112        batches.push(batch);
113    }
114
115    Ok(batches)
116}
117
118/// Get the schema from an IPC file format buffer without reading the data
119pub fn ipc_file_schema(data: &[u8]) -> ArrowResult<SchemaRef> {
120    let cursor = Cursor::new(data.to_vec());
121    let reader = FileReader::try_new(cursor, None)?;
122    Ok(reader.schema())
123}
124
125/// Get the schema from an IPC stream format buffer without reading all data
126pub fn ipc_stream_schema(data: &[u8]) -> ArrowResult<SchemaRef> {
127    let cursor = Cursor::new(data);
128    let reader = StreamReader::try_new(cursor, None)?;
129    Ok(reader.schema())
130}
131
132// =============================================================================
133// File-based IPC operations
134// =============================================================================
135
136/// Write `RecordBatch` instances to an Arrow IPC file on disk
137///
138/// # Arguments
139///
140/// * `path` - File path to write to
141/// * `batches` - Record batches to write
142///
143/// # Examples
144///
145/// ```rust,no_run
146/// # use scirs2_core::arrow_compat::ipc::write_ipc_file;
147/// # use scirs2_core::arrow_compat::conversions::array2_to_record_batch;
148/// # use ndarray::Array2;
149/// # use std::path::Path;
150/// let arr = Array2::from_shape_vec((100, 3), (0..300).map(|i| i as f64).collect())
151///     .expect("shape error");
152/// let batch = array2_to_record_batch(&arr, Some(&["x", "y", "z"])).expect("conversion failed");
153/// write_ipc_file(Path::new("/tmp/data.arrow"), &[batch]).expect("write failed");
154/// ```
155pub fn write_ipc_file(path: &Path, batches: &[RecordBatch]) -> ArrowResult<()> {
156    if batches.is_empty() {
157        return Err(ArrowCompatError::SchemaError(
158            "No record batches to write".to_string(),
159        ));
160    }
161
162    let schema = batches[0].schema();
163    let file = File::create(path)?;
164    let buf_writer = BufWriter::new(file);
165
166    let mut writer = FileWriter::try_new(buf_writer, &schema)?;
167    for batch in batches {
168        writer.write(batch)?;
169    }
170    writer.finish()?;
171
172    Ok(())
173}
174
175/// Read `RecordBatch` instances from an Arrow IPC file on disk
176///
177/// # Arguments
178///
179/// * `path` - File path to read from
180pub fn read_ipc_file(path: &Path) -> ArrowResult<Vec<RecordBatch>> {
181    let file = File::open(path)?;
182    let buf_reader = BufReader::new(file);
183    let reader = FileReader::try_new(buf_reader, None)?;
184
185    let mut batches = Vec::new();
186    for batch_result in reader {
187        let batch = batch_result?;
188        batches.push(batch);
189    }
190
191    Ok(batches)
192}
193
194/// Write `RecordBatch` instances to an Arrow IPC stream file on disk
195pub fn write_ipc_stream_file(path: &Path, batches: &[RecordBatch]) -> ArrowResult<()> {
196    if batches.is_empty() {
197        return Err(ArrowCompatError::SchemaError(
198            "No record batches to write".to_string(),
199        ));
200    }
201
202    let schema = batches[0].schema();
203    let file = File::create(path)?;
204    let buf_writer = BufWriter::new(file);
205
206    let mut writer = StreamWriter::try_new(buf_writer, &schema)?;
207    for batch in batches {
208        writer.write(batch)?;
209    }
210    writer.finish()?;
211
212    Ok(())
213}
214
215/// Read `RecordBatch` instances from an Arrow IPC stream file on disk
216pub fn read_ipc_stream_file(path: &Path) -> ArrowResult<Vec<RecordBatch>> {
217    let file = File::open(path)?;
218    let buf_reader = BufReader::new(file);
219    let reader = StreamReader::try_new(buf_reader, None)?;
220
221    let mut batches = Vec::new();
222    for batch_result in reader {
223        let batch = batch_result?;
224        batches.push(batch);
225    }
226
227    Ok(batches)
228}
229
230// =============================================================================
231// Memory-mapped IPC file support
232// =============================================================================
233
234/// Open a memory-mapped Arrow IPC file for reading
235///
236/// Memory-mapped access is efficient for large files as it avoids
237/// loading the entire file into memory. The operating system handles
238/// paging data in and out as needed.
239///
240/// # Arguments
241///
242/// * `path` - Path to the Arrow IPC file
243///
244/// # Returns
245///
246/// A vector of `RecordBatch` instances read from the memory-mapped file.
247/// Note: The actual mmap is managed internally by reading through
248/// the standard file reader with OS-level memory mapping.
249pub fn mmap_read_ipc_file(path: &Path) -> ArrowResult<MmapIpcReader> {
250    MmapIpcReader::open(path)
251}
252
253/// Memory-mapped IPC file reader
254///
255/// Provides lazy access to record batches in an Arrow IPC file
256/// without loading the entire file into memory upfront.
257pub struct MmapIpcReader {
258    /// The raw file data loaded via mmap
259    data: memmap2::Mmap,
260    /// Schema of the IPC file
261    schema: SchemaRef,
262    /// Number of record batches in the file
263    num_batches: usize,
264}
265
266impl MmapIpcReader {
267    /// Open an Arrow IPC file with memory mapping
268    pub fn open(path: &Path) -> ArrowResult<Self> {
269        let file = File::open(path)?;
270
271        // Safety: We ensure the file is not modified while mapped
272        // by holding the File handle. The mmap is read-only.
273        let mmap = unsafe { memmap2::Mmap::map(&file)? };
274
275        // Parse the file to get schema and batch count
276        let cursor = Cursor::new(mmap.as_ref());
277        let reader = FileReader::try_new(cursor, None)?;
278        let schema = reader.schema();
279        let num_batches = reader.num_batches();
280
281        Ok(Self {
282            data: mmap,
283            schema,
284            num_batches,
285        })
286    }
287
288    /// Get the schema of the IPC file
289    pub fn schema(&self) -> &SchemaRef {
290        &self.schema
291    }
292
293    /// Get the number of record batches in the file
294    pub fn num_batches(&self) -> usize {
295        self.num_batches
296    }
297
298    /// Read a specific record batch by index
299    pub fn read_batch(&self, index: usize) -> ArrowResult<RecordBatch> {
300        if index >= self.num_batches {
301            return Err(ArrowCompatError::ColumnOutOfBounds {
302                index,
303                num_columns: self.num_batches,
304            });
305        }
306
307        let cursor = Cursor::new(self.data.as_ref());
308        let reader = FileReader::try_new(cursor, None)?;
309
310        for (i, batch_result) in reader.enumerate() {
311            if i == index {
312                return batch_result.map_err(ArrowCompatError::from);
313            }
314        }
315
316        Err(ArrowCompatError::SchemaError(format!(
317            "Batch index {} not found (file has {} batches)",
318            index, self.num_batches
319        )))
320    }
321
322    /// Read all record batches from the memory-mapped file
323    pub fn read_all_batches(&self) -> ArrowResult<Vec<RecordBatch>> {
324        let cursor = Cursor::new(self.data.as_ref());
325        let reader = FileReader::try_new(cursor, None)?;
326
327        let mut batches = Vec::with_capacity(self.num_batches);
328        for batch_result in reader {
329            let batch = batch_result?;
330            batches.push(batch);
331        }
332
333        Ok(batches)
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use crate::arrow_compat::conversions::array2_to_record_batch;
341    use ndarray::Array2;
342
343    fn make_test_batch() -> RecordBatch {
344        let arr = Array2::from_shape_vec((5, 3), (0..15).map(|i| i as f64).collect())
345            .expect("shape error");
346        array2_to_record_batch(&arr, Some(&["x", "y", "z"])).expect("conversion failed")
347    }
348
349    // -------------------------------------------------------
350    // In-memory IPC stream tests
351    // -------------------------------------------------------
352
353    #[test]
354    fn test_ipc_stream_roundtrip() {
355        let batch = make_test_batch();
356        let bytes =
357            record_batch_to_ipc_stream(std::slice::from_ref(&batch)).expect("serialize failed");
358        let recovered = ipc_stream_to_record_batches(&bytes).expect("deserialize failed");
359
360        assert_eq!(recovered.len(), 1);
361        assert_eq!(recovered[0].num_rows(), 5);
362        assert_eq!(recovered[0].num_columns(), 3);
363        assert_eq!(recovered[0].schema(), batch.schema());
364    }
365
366    #[test]
367    fn test_ipc_stream_multiple_batches() {
368        let batch1 = make_test_batch();
369        let batch2 = make_test_batch();
370        let bytes = record_batch_to_ipc_stream(&[batch1, batch2]).expect("serialize failed");
371        let recovered = ipc_stream_to_record_batches(&bytes).expect("deserialize failed");
372
373        assert_eq!(recovered.len(), 2);
374    }
375
376    #[test]
377    fn test_ipc_stream_empty_batches_error() {
378        let result = record_batch_to_ipc_stream(&[]);
379        assert!(result.is_err());
380    }
381
382    // -------------------------------------------------------
383    // In-memory IPC file format tests
384    // -------------------------------------------------------
385
386    #[test]
387    fn test_ipc_file_roundtrip() {
388        let batch = make_test_batch();
389        let bytes =
390            record_batch_to_ipc_file(std::slice::from_ref(&batch)).expect("serialize failed");
391        let recovered = ipc_file_to_record_batches(&bytes).expect("deserialize failed");
392
393        assert_eq!(recovered.len(), 1);
394        assert_eq!(recovered[0].num_rows(), 5);
395        assert_eq!(recovered[0].num_columns(), 3);
396    }
397
398    #[test]
399    fn test_ipc_file_schema_extraction() {
400        let batch = make_test_batch();
401        let bytes = record_batch_to_ipc_file(&[batch]).expect("serialize failed");
402        let schema = ipc_file_schema(&bytes).expect("schema extraction failed");
403
404        assert_eq!(schema.fields().len(), 3);
405        assert_eq!(schema.field(0).name(), "x");
406    }
407
408    #[test]
409    fn test_ipc_stream_schema_extraction() {
410        let batch = make_test_batch();
411        let bytes = record_batch_to_ipc_stream(&[batch]).expect("serialize failed");
412        let schema = ipc_stream_schema(&bytes).expect("schema extraction failed");
413
414        assert_eq!(schema.fields().len(), 3);
415        assert_eq!(schema.field(0).name(), "x");
416    }
417
418    // -------------------------------------------------------
419    // File-based IPC tests
420    // -------------------------------------------------------
421
422    #[test]
423    fn test_file_ipc_roundtrip() {
424        let batch = make_test_batch();
425        let tmp_dir = std::env::temp_dir();
426        let path = tmp_dir.join("scirs2_arrow_test_ipc.arrow");
427
428        write_ipc_file(&path, std::slice::from_ref(&batch)).expect("write failed");
429        let recovered = read_ipc_file(&path).expect("read failed");
430
431        assert_eq!(recovered.len(), 1);
432        assert_eq!(recovered[0].num_rows(), 5);
433        assert_eq!(recovered[0].num_columns(), 3);
434
435        // Cleanup
436        let _ = std::fs::remove_file(&path);
437    }
438
439    #[test]
440    fn test_file_ipc_stream_roundtrip() {
441        let batch = make_test_batch();
442        let tmp_dir = std::env::temp_dir();
443        let path = tmp_dir.join("scirs2_arrow_test_ipc_stream.arrows");
444
445        write_ipc_stream_file(&path, std::slice::from_ref(&batch)).expect("write failed");
446        let recovered = read_ipc_stream_file(&path).expect("read failed");
447
448        assert_eq!(recovered.len(), 1);
449        assert_eq!(recovered[0].num_rows(), 5);
450        assert_eq!(recovered[0].num_columns(), 3);
451
452        // Cleanup
453        let _ = std::fs::remove_file(&path);
454    }
455
456    // -------------------------------------------------------
457    // Memory-mapped IPC tests
458    // -------------------------------------------------------
459
460    #[test]
461    fn test_mmap_ipc_reader() {
462        let batch = make_test_batch();
463        let tmp_dir = std::env::temp_dir();
464        let path = tmp_dir.join("scirs2_arrow_test_mmap.arrow");
465
466        write_ipc_file(&path, std::slice::from_ref(&batch)).expect("write failed");
467
468        let reader = mmap_read_ipc_file(&path).expect("mmap open failed");
469        assert_eq!(reader.num_batches(), 1);
470        assert_eq!(reader.schema().fields().len(), 3);
471
472        let read_batch = reader.read_batch(0).expect("read_batch failed");
473        assert_eq!(read_batch.num_rows(), 5);
474
475        let all = reader.read_all_batches().expect("read_all failed");
476        assert_eq!(all.len(), 1);
477
478        // Cleanup
479        let _ = std::fs::remove_file(&path);
480    }
481
482    #[test]
483    fn test_mmap_reader_batch_out_of_bounds() {
484        let batch = make_test_batch();
485        let tmp_dir = std::env::temp_dir();
486        let path = tmp_dir.join("scirs2_arrow_test_mmap_oob.arrow");
487
488        write_ipc_file(&path, &[batch]).expect("write failed");
489
490        let reader = mmap_read_ipc_file(&path).expect("mmap open failed");
491        let result = reader.read_batch(10);
492        assert!(result.is_err());
493
494        // Cleanup
495        let _ = std::fs::remove_file(&path);
496    }
497}