scirs2_core/arrow_compat/
ipc.rs1use super::error::{ArrowCompatError, ArrowResult};
12use arrow::array::ArrayRef;
13use arrow::datatypes::SchemaRef;
14use arrow::ipc::reader::{FileReader, StreamReader};
15use arrow::ipc::writer::{FileWriter, StreamWriter};
16use arrow::record_batch::RecordBatch;
17use std::fs::File;
18use std::io::{BufReader, BufWriter, Cursor, Read, Seek, Write};
19use std::path::Path;
20
21pub fn record_batch_to_ipc_stream(batches: &[RecordBatch]) -> ArrowResult<Vec<u8>> {
45 if batches.is_empty() {
46 return Err(ArrowCompatError::SchemaError(
47 "No record batches to serialize".to_string(),
48 ));
49 }
50
51 let schema = batches[0].schema();
52 let mut buffer = Vec::new();
53
54 {
55 let mut writer = StreamWriter::try_new(&mut buffer, &schema)?;
56 for batch in batches {
57 writer.write(batch)?;
58 }
59 writer.finish()?;
60 }
61
62 Ok(buffer)
63}
64
65pub fn ipc_stream_to_record_batches(data: &[u8]) -> ArrowResult<Vec<RecordBatch>> {
67 let cursor = Cursor::new(data);
68 let reader = StreamReader::try_new(cursor, None)?;
69
70 let mut batches = Vec::new();
71 for batch_result in reader {
72 let batch = batch_result?;
73 batches.push(batch);
74 }
75
76 Ok(batches)
77}
78
79pub fn record_batch_to_ipc_file(batches: &[RecordBatch]) -> ArrowResult<Vec<u8>> {
84 if batches.is_empty() {
85 return Err(ArrowCompatError::SchemaError(
86 "No record batches to serialize".to_string(),
87 ));
88 }
89
90 let schema = batches[0].schema();
91 let mut buffer = Vec::new();
92
93 {
94 let mut writer = FileWriter::try_new(&mut buffer, &schema)?;
95 for batch in batches {
96 writer.write(batch)?;
97 }
98 writer.finish()?;
99 }
100
101 Ok(buffer)
102}
103
104pub fn ipc_file_to_record_batches(data: &[u8]) -> ArrowResult<Vec<RecordBatch>> {
106 let cursor = Cursor::new(data.to_vec());
107 let reader = FileReader::try_new(cursor, None)?;
108
109 let mut batches = Vec::new();
110 for batch_result in reader {
111 let batch = batch_result?;
112 batches.push(batch);
113 }
114
115 Ok(batches)
116}
117
118pub fn ipc_file_schema(data: &[u8]) -> ArrowResult<SchemaRef> {
120 let cursor = Cursor::new(data.to_vec());
121 let reader = FileReader::try_new(cursor, None)?;
122 Ok(reader.schema())
123}
124
125pub fn ipc_stream_schema(data: &[u8]) -> ArrowResult<SchemaRef> {
127 let cursor = Cursor::new(data);
128 let reader = StreamReader::try_new(cursor, None)?;
129 Ok(reader.schema())
130}
131
132pub fn write_ipc_file(path: &Path, batches: &[RecordBatch]) -> ArrowResult<()> {
156 if batches.is_empty() {
157 return Err(ArrowCompatError::SchemaError(
158 "No record batches to write".to_string(),
159 ));
160 }
161
162 let schema = batches[0].schema();
163 let file = File::create(path)?;
164 let buf_writer = BufWriter::new(file);
165
166 let mut writer = FileWriter::try_new(buf_writer, &schema)?;
167 for batch in batches {
168 writer.write(batch)?;
169 }
170 writer.finish()?;
171
172 Ok(())
173}
174
175pub fn read_ipc_file(path: &Path) -> ArrowResult<Vec<RecordBatch>> {
181 let file = File::open(path)?;
182 let buf_reader = BufReader::new(file);
183 let reader = FileReader::try_new(buf_reader, None)?;
184
185 let mut batches = Vec::new();
186 for batch_result in reader {
187 let batch = batch_result?;
188 batches.push(batch);
189 }
190
191 Ok(batches)
192}
193
194pub fn write_ipc_stream_file(path: &Path, batches: &[RecordBatch]) -> ArrowResult<()> {
196 if batches.is_empty() {
197 return Err(ArrowCompatError::SchemaError(
198 "No record batches to write".to_string(),
199 ));
200 }
201
202 let schema = batches[0].schema();
203 let file = File::create(path)?;
204 let buf_writer = BufWriter::new(file);
205
206 let mut writer = StreamWriter::try_new(buf_writer, &schema)?;
207 for batch in batches {
208 writer.write(batch)?;
209 }
210 writer.finish()?;
211
212 Ok(())
213}
214
215pub fn read_ipc_stream_file(path: &Path) -> ArrowResult<Vec<RecordBatch>> {
217 let file = File::open(path)?;
218 let buf_reader = BufReader::new(file);
219 let reader = StreamReader::try_new(buf_reader, None)?;
220
221 let mut batches = Vec::new();
222 for batch_result in reader {
223 let batch = batch_result?;
224 batches.push(batch);
225 }
226
227 Ok(batches)
228}
229
230pub fn mmap_read_ipc_file(path: &Path) -> ArrowResult<MmapIpcReader> {
250 MmapIpcReader::open(path)
251}
252
253pub struct MmapIpcReader {
258 data: memmap2::Mmap,
260 schema: SchemaRef,
262 num_batches: usize,
264}
265
266impl MmapIpcReader {
267 pub fn open(path: &Path) -> ArrowResult<Self> {
269 let file = File::open(path)?;
270
271 let mmap = unsafe { memmap2::Mmap::map(&file)? };
274
275 let cursor = Cursor::new(mmap.as_ref());
277 let reader = FileReader::try_new(cursor, None)?;
278 let schema = reader.schema();
279 let num_batches = reader.num_batches();
280
281 Ok(Self {
282 data: mmap,
283 schema,
284 num_batches,
285 })
286 }
287
288 pub fn schema(&self) -> &SchemaRef {
290 &self.schema
291 }
292
293 pub fn num_batches(&self) -> usize {
295 self.num_batches
296 }
297
298 pub fn read_batch(&self, index: usize) -> ArrowResult<RecordBatch> {
300 if index >= self.num_batches {
301 return Err(ArrowCompatError::ColumnOutOfBounds {
302 index,
303 num_columns: self.num_batches,
304 });
305 }
306
307 let cursor = Cursor::new(self.data.as_ref());
308 let reader = FileReader::try_new(cursor, None)?;
309
310 for (i, batch_result) in reader.enumerate() {
311 if i == index {
312 return batch_result.map_err(ArrowCompatError::from);
313 }
314 }
315
316 Err(ArrowCompatError::SchemaError(format!(
317 "Batch index {} not found (file has {} batches)",
318 index, self.num_batches
319 )))
320 }
321
322 pub fn read_all_batches(&self) -> ArrowResult<Vec<RecordBatch>> {
324 let cursor = Cursor::new(self.data.as_ref());
325 let reader = FileReader::try_new(cursor, None)?;
326
327 let mut batches = Vec::with_capacity(self.num_batches);
328 for batch_result in reader {
329 let batch = batch_result?;
330 batches.push(batch);
331 }
332
333 Ok(batches)
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::arrow_compat::conversions::array2_to_record_batch;
341 use ndarray::Array2;
342
343 fn make_test_batch() -> RecordBatch {
344 let arr = Array2::from_shape_vec((5, 3), (0..15).map(|i| i as f64).collect())
345 .expect("shape error");
346 array2_to_record_batch(&arr, Some(&["x", "y", "z"])).expect("conversion failed")
347 }
348
349 #[test]
354 fn test_ipc_stream_roundtrip() {
355 let batch = make_test_batch();
356 let bytes =
357 record_batch_to_ipc_stream(std::slice::from_ref(&batch)).expect("serialize failed");
358 let recovered = ipc_stream_to_record_batches(&bytes).expect("deserialize failed");
359
360 assert_eq!(recovered.len(), 1);
361 assert_eq!(recovered[0].num_rows(), 5);
362 assert_eq!(recovered[0].num_columns(), 3);
363 assert_eq!(recovered[0].schema(), batch.schema());
364 }
365
366 #[test]
367 fn test_ipc_stream_multiple_batches() {
368 let batch1 = make_test_batch();
369 let batch2 = make_test_batch();
370 let bytes = record_batch_to_ipc_stream(&[batch1, batch2]).expect("serialize failed");
371 let recovered = ipc_stream_to_record_batches(&bytes).expect("deserialize failed");
372
373 assert_eq!(recovered.len(), 2);
374 }
375
376 #[test]
377 fn test_ipc_stream_empty_batches_error() {
378 let result = record_batch_to_ipc_stream(&[]);
379 assert!(result.is_err());
380 }
381
382 #[test]
387 fn test_ipc_file_roundtrip() {
388 let batch = make_test_batch();
389 let bytes =
390 record_batch_to_ipc_file(std::slice::from_ref(&batch)).expect("serialize failed");
391 let recovered = ipc_file_to_record_batches(&bytes).expect("deserialize failed");
392
393 assert_eq!(recovered.len(), 1);
394 assert_eq!(recovered[0].num_rows(), 5);
395 assert_eq!(recovered[0].num_columns(), 3);
396 }
397
398 #[test]
399 fn test_ipc_file_schema_extraction() {
400 let batch = make_test_batch();
401 let bytes = record_batch_to_ipc_file(&[batch]).expect("serialize failed");
402 let schema = ipc_file_schema(&bytes).expect("schema extraction failed");
403
404 assert_eq!(schema.fields().len(), 3);
405 assert_eq!(schema.field(0).name(), "x");
406 }
407
408 #[test]
409 fn test_ipc_stream_schema_extraction() {
410 let batch = make_test_batch();
411 let bytes = record_batch_to_ipc_stream(&[batch]).expect("serialize failed");
412 let schema = ipc_stream_schema(&bytes).expect("schema extraction failed");
413
414 assert_eq!(schema.fields().len(), 3);
415 assert_eq!(schema.field(0).name(), "x");
416 }
417
418 #[test]
423 fn test_file_ipc_roundtrip() {
424 let batch = make_test_batch();
425 let tmp_dir = std::env::temp_dir();
426 let path = tmp_dir.join("scirs2_arrow_test_ipc.arrow");
427
428 write_ipc_file(&path, std::slice::from_ref(&batch)).expect("write failed");
429 let recovered = read_ipc_file(&path).expect("read failed");
430
431 assert_eq!(recovered.len(), 1);
432 assert_eq!(recovered[0].num_rows(), 5);
433 assert_eq!(recovered[0].num_columns(), 3);
434
435 let _ = std::fs::remove_file(&path);
437 }
438
439 #[test]
440 fn test_file_ipc_stream_roundtrip() {
441 let batch = make_test_batch();
442 let tmp_dir = std::env::temp_dir();
443 let path = tmp_dir.join("scirs2_arrow_test_ipc_stream.arrows");
444
445 write_ipc_stream_file(&path, std::slice::from_ref(&batch)).expect("write failed");
446 let recovered = read_ipc_stream_file(&path).expect("read failed");
447
448 assert_eq!(recovered.len(), 1);
449 assert_eq!(recovered[0].num_rows(), 5);
450 assert_eq!(recovered[0].num_columns(), 3);
451
452 let _ = std::fs::remove_file(&path);
454 }
455
456 #[test]
461 fn test_mmap_ipc_reader() {
462 let batch = make_test_batch();
463 let tmp_dir = std::env::temp_dir();
464 let path = tmp_dir.join("scirs2_arrow_test_mmap.arrow");
465
466 write_ipc_file(&path, std::slice::from_ref(&batch)).expect("write failed");
467
468 let reader = mmap_read_ipc_file(&path).expect("mmap open failed");
469 assert_eq!(reader.num_batches(), 1);
470 assert_eq!(reader.schema().fields().len(), 3);
471
472 let read_batch = reader.read_batch(0).expect("read_batch failed");
473 assert_eq!(read_batch.num_rows(), 5);
474
475 let all = reader.read_all_batches().expect("read_all failed");
476 assert_eq!(all.len(), 1);
477
478 let _ = std::fs::remove_file(&path);
480 }
481
482 #[test]
483 fn test_mmap_reader_batch_out_of_bounds() {
484 let batch = make_test_batch();
485 let tmp_dir = std::env::temp_dir();
486 let path = tmp_dir.join("scirs2_arrow_test_mmap_oob.arrow");
487
488 write_ipc_file(&path, &[batch]).expect("write failed");
489
490 let reader = mmap_read_ipc_file(&path).expect("mmap open failed");
491 let result = reader.read_batch(10);
492 assert!(result.is_err());
493
494 let _ = std::fs::remove_file(&path);
496 }
497}