use std::collections::{HashMap, HashSet, VecDeque};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use futures_core::Stream;
use minarrow::{ArrowType, Field, Table, Vec64};
use crate::arrow::message::org::apache::arrow::flatbuf as fbm;
use crate::compression::{Compression, compress};
use crate::constants::DEFAULT_FRAME_ALLOCATION_SIZE;
use crate::enums::{IPCMessageProtocol, WriterState};
use crate::models::encoders::ipc::protocol::{IPCFrame, IPCFrameEncoder};
use crate::models::encoders::ipc::schema::build_flatbuf_recordbatch;
use crate::models::encoders::ipc::schema::{
FooterBlockMeta, build_flatbuf_footer, build_flatbuf_schema, encode_flatbuf_dictionary,
};
use crate::traits::frame_encoder::FrameEncoder;
use crate::traits::stream_buffer::StreamBuffer;
use crate::utils::{align_to, as_bytes};
use minarrow::{Array, Bitmask, NumericArray, TextArray};
pub type TableStreamEncoder = GTableStreamEncoder<Vec<u8>>;
pub type TableStreamEncoder64 = GTableStreamEncoder<Vec64<u8>>;
pub struct GTableStreamEncoder<B>
where
B: StreamBuffer + 'static,
{
pub protocol: IPCMessageProtocol,
pub state: WriterState,
pub compression: Compression,
pub schema: Vec<Field>,
pub written_dict_ids: HashSet<i64>,
pub dictionaries: HashMap<i64, Vec<String>>,
pub fbb: flatbuffers::FlatBufferBuilder<'static>,
blocks_record_batches: Vec<FooterBlockMeta>,
blocks_dictionaries: Vec<FooterBlockMeta>,
pub frame_offsets: Vec<u64>,
pub out_frames: VecDeque<B>,
pub total_len_offset: u64,
pub global_offset: usize,
pub finished: bool,
pub waker: Option<Waker>,
}
impl<B> GTableStreamEncoder<B>
where
B: StreamBuffer,
{
pub fn new(schema: Vec<Field>, protocol: IPCMessageProtocol) -> Self {
Self {
protocol,
state: WriterState::Fresh,
compression: Compression::None,
schema,
written_dict_ids: HashSet::new(),
dictionaries: HashMap::new(),
fbb: flatbuffers::FlatBufferBuilder::with_capacity(4096),
blocks_record_batches: Vec::new(),
blocks_dictionaries: Vec::new(),
frame_offsets: vec![],
out_frames: VecDeque::new(),
total_len_offset: 0,
global_offset: 0,
finished: false,
waker: None,
}
}
pub fn with_compression(
schema: Vec<Field>,
protocol: IPCMessageProtocol,
compression: Compression,
) -> Self {
Self {
protocol,
state: WriterState::Fresh,
compression,
schema,
written_dict_ids: HashSet::new(),
dictionaries: HashMap::new(),
fbb: flatbuffers::FlatBufferBuilder::with_capacity(4096),
blocks_record_batches: Vec::new(),
blocks_dictionaries: Vec::new(),
frame_offsets: vec![],
out_frames: VecDeque::new(),
total_len_offset: 0,
global_offset: 0,
finished: false,
waker: None,
}
}
pub fn register_dictionary(&mut self, id: i64, uniques: Vec<String>) {
self.dictionaries.insert(id, uniques);
}
pub fn write_schema_frame(&mut self) -> io::Result<()> {
let meta = build_flatbuf_schema(&mut self.fbb, &self.schema)?;
let body = B::with_capacity(DEFAULT_FRAME_ALLOCATION_SIZE);
self.emit_frame(meta, body, fbm::MessageHeader::Schema);
self.state = WriterState::SchemaDone;
Ok(())
}
pub fn write_record_batch_frame(&mut self, tbl: &Table) -> io::Result<()> {
if self.state == WriterState::Closed {
return Err(io::Error::new(
io::ErrorKind::Other,
"writer already finished",
));
}
if self.state == WriterState::Fresh {
self.write_schema_frame()?;
}
if tbl.cols.len() != self.schema.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"table column count mismatch with writer schema",
));
}
let dict_ids: Vec<i64> = self
.schema
.iter()
.enumerate()
.filter_map(|(col_idx, field)| {
if let ArrowType::Dictionary(_) = field.dtype {
Some(col_idx as i64)
} else {
None
}
})
.collect();
for dict_id in dict_ids {
self.write_dictionary_frame_if_needed(dict_id)?;
}
let (meta, body) = self.encode_record_batch(tbl)?;
self.emit_frame(meta, body, fbm::MessageHeader::RecordBatch);
Ok(())
}
pub(crate) fn write_dictionary_frame_if_needed(&mut self, id: i64) -> io::Result<()> {
if self.written_dict_ids.contains(&id) {
return Ok(());
}
let uniques = self.dictionaries.get(&id).ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("dictionary id {id} not registered"),
)
})?;
let (meta, body) = encode_flatbuf_dictionary(&mut self.fbb, id, uniques)?;
self.emit_frame(meta, body, fbm::MessageHeader::DictionaryBatch);
self.written_dict_ids.insert(id);
Ok(())
}
fn emit_frame(&mut self, meta: Vec<u8>, body: B, header_type: fbm::MessageHeader) {
let is_first = self.protocol == IPCMessageProtocol::File && self.frame_offsets.is_empty();
let is_last = false;
let frame = IPCFrame {
meta: &meta,
body: body.as_ref(),
protocol: self.protocol,
is_first,
is_last,
footer_bytes: None,
};
let (encoded, ipc_frame_metadata) =
IPCFrameEncoder::encode::<B>(&mut self.global_offset, &frame)
.expect("IPC frame encoding failed");
debug_assert!(encoded.len() == ipc_frame_metadata.frame_len());
if self.protocol == IPCMessageProtocol::File {
let block = FooterBlockMeta {
offset: self.total_len_offset,
metadata_len: ipc_frame_metadata.metadata_total_len() as u32
+ ipc_frame_metadata.header_len as u32,
body_len: ipc_frame_metadata.body_total_len() as u64,
};
match header_type {
fbm::MessageHeader::DictionaryBatch => self.blocks_dictionaries.push(block),
fbm::MessageHeader::RecordBatch => self.blocks_record_batches.push(block),
_ => {}
}
self.frame_offsets.push(self.total_len_offset);
self.total_len_offset += ipc_frame_metadata.frame_len() as u64;
}
self.out_frames.push_back(encoded);
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
pub fn finish(&mut self) -> io::Result<()> {
if self.state == WriterState::Closed {
return Ok(());
}
match self.protocol {
IPCMessageProtocol::File => {
let is_first = self.frame_offsets.is_empty();
println!("IS FIRST: {}", is_first);
let is_last = true;
let footer_bytes = build_flatbuf_footer(
&mut self.fbb,
&self.schema,
&self.blocks_dictionaries,
&self.blocks_record_batches,
)?;
let frame = IPCFrame {
meta: &[],
body: &[],
protocol: IPCMessageProtocol::File,
is_first,
is_last,
footer_bytes: Some(&footer_bytes),
};
let (footer_frame, _) =
IPCFrameEncoder::encode::<B>(&mut self.global_offset, &frame)?;
self.out_frames.push_back(footer_frame);
}
IPCMessageProtocol::Stream => {
if self.state != WriterState::Fresh {
let frame = IPCFrame {
meta: &[],
body: &[],
protocol: IPCMessageProtocol::Stream,
is_first: false,
is_last: true,
footer_bytes: None,
};
let (eos_frame, _) =
IPCFrameEncoder::encode::<B>(&mut self.global_offset, &frame)?;
self.out_frames.push_back(eos_frame);
}
}
}
self.state = WriterState::Closed;
self.finished = true;
if let Some(waker) = self.waker.take() {
waker.wake();
}
Ok(())
}
fn estimate_body_size(&self, tbl: &Table) -> usize {
let mut total_size = 0;
for array in &tbl.cols {
match &array.array {
Array::NumericArray(num) => {
let data_size = match num {
NumericArray::Int32(_)
| NumericArray::UInt32(_)
| NumericArray::Float32(_) => tbl.n_rows * 4,
NumericArray::Int64(_)
| NumericArray::UInt64(_)
| NumericArray::Float64(_) => tbl.n_rows * 8,
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int8(_) | NumericArray::UInt8(_) => tbl.n_rows,
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int16(_) | NumericArray::UInt16(_) => tbl.n_rows * 2,
_ => tbl.n_rows * 8, };
total_size += data_size + 64; }
Array::BooleanArray(_) => {
total_size += (tbl.n_rows + 7) / 8 + 64; }
Array::TextArray(_) => {
total_size += tbl.n_rows * 24 + 64; }
_ => {
total_size += tbl.n_rows * 8 + 64; }
}
}
total_size
}
pub fn encode_record_batch(&mut self, tbl: &Table) -> io::Result<(Vec<u8>, B)> {
let mut fb_field_nodes = Vec::with_capacity(tbl.cols.len());
let mut fb_buffers = Vec::new();
let estimated_size = self.estimate_body_size(tbl);
let mut body = B::with_capacity(estimated_size.max(DEFAULT_FRAME_ALLOCATION_SIZE));
for (col_idx, array) in tbl.cols.iter().enumerate() {
debug_assert!(array.field == self.schema[col_idx].clone().into());
let field = &array.field;
self.encode_column(
field,
&array.array,
tbl.n_rows,
col_idx,
&mut fb_field_nodes,
&mut fb_buffers,
&mut body,
)?;
}
let meta = build_flatbuf_recordbatch(
&mut self.fbb,
tbl.n_rows,
&fb_field_nodes,
&fb_buffers,
body.len(),
)?;
Ok((meta, body))
}
#[inline(always)]
fn encode_column(
&mut self,
field: &Field,
array: &Array,
n_rows: usize,
col_idx: usize,
fb_field_nodes: &mut Vec<fbm::FieldNode>,
fb_buffers: &mut Vec<fbm::Buffer>,
body: &mut B,
) -> io::Result<()> {
match array {
Array::NumericArray(num) => {
let (data_bytes, null_bitmap) = match num {
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int8(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
#[cfg(feature = "extended_numeric_types")]
NumericArray::Int16(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
NumericArray::Int32(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
NumericArray::Int64(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt8(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
#[cfg(feature = "extended_numeric_types")]
NumericArray::UInt16(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
NumericArray::UInt32(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
NumericArray::UInt64(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
NumericArray::Float32(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
NumericArray::Float64(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"unsupported numeric subtype",
));
}
};
self.encode_field_buffers(
field.nullable,
null_bitmap,
&[data_bytes],
n_rows,
col_idx,
false,
fb_field_nodes,
fb_buffers,
body,
)
}
Array::BooleanArray(arr) => self.encode_field_buffers(
field.nullable,
arr.null_mask.as_ref(),
&[arr.data.bits.as_slice()],
n_rows,
col_idx,
false,
fb_field_nodes,
fb_buffers,
body,
),
Array::TextArray(TextArray::String32(arr)) => self.encode_field_buffers(
field.nullable,
arr.null_mask.as_ref(),
&[as_bytes(arr.offsets.as_slice()), arr.data.as_slice()],
n_rows,
col_idx,
false,
fb_field_nodes,
fb_buffers,
body,
),
#[cfg(feature = "large_string")]
Array::TextArray(TextArray::String64(arr)) => self.encode_field_buffers(
field.nullable,
arr.null_mask.as_ref(),
&[as_bytes(arr.offsets.as_slice()), arr.data.as_slice()],
n_rows,
col_idx,
false,
fb_field_nodes,
fb_buffers,
body,
),
Array::TextArray(TextArray::Categorical32(arr)) => self.encode_field_buffers(
field.nullable,
arr.null_mask.as_ref(),
&[as_bytes(arr.data.as_slice())],
n_rows,
col_idx,
true,
fb_field_nodes,
fb_buffers,
body,
),
#[cfg(feature = "extended_categorical")]
Array::TextArray(TextArray::Categorical8(arr)) => self.encode_field_buffers(
field.nullable,
arr.null_mask.as_ref(),
&[as_bytes(arr.data.as_slice())],
n_rows,
col_idx,
true,
fb_field_nodes,
fb_buffers,
body,
),
#[cfg(feature = "extended_categorical")]
Array::TextArray(TextArray::Categorical16(arr)) => self.encode_field_buffers(
field.nullable,
arr.null_mask.as_ref(),
&[as_bytes(arr.data.as_slice())],
n_rows,
col_idx,
true,
fb_field_nodes,
fb_buffers,
body,
),
#[cfg(feature = "extended_categorical")]
Array::TextArray(TextArray::Categorical64(arr)) => self.encode_field_buffers(
field.nullable,
arr.null_mask.as_ref(),
&[as_bytes(arr.data.as_slice())],
n_rows,
col_idx,
true,
fb_field_nodes,
fb_buffers,
body,
),
#[cfg(feature = "datetime")]
Array::TemporalArray(temp) => {
let (data_bytes, null_bitmap) = match temp {
minarrow::TemporalArray::Datetime32(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
minarrow::TemporalArray::Datetime64(arr) => {
(as_bytes(arr.data.as_slice()), arr.null_mask.as_ref())
}
minarrow::TemporalArray::Null => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"null temporal array not supported",
));
}
};
self.encode_field_buffers(
field.nullable,
null_bitmap,
&[data_bytes],
n_rows,
col_idx,
false,
fb_field_nodes,
fb_buffers,
body,
)
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unsupported column type in writer: {}", field.name),
)),
}
}
#[inline(always)]
fn encode_field_buffers(
&mut self,
nullable: bool,
null_bitmap: Option<&Bitmask>,
data_slices: &[&[u8]],
n_rows: usize,
col_idx: usize,
write_dict: bool,
fb_field_nodes: &mut Vec<fbm::FieldNode>,
fb_buffers: &mut Vec<fbm::Buffer>,
body: &mut B,
) -> io::Result<()> {
if write_dict {
self.write_dictionary_frame_if_needed(col_idx as i64)?;
}
let (n_nulls, null_buf) = make_null_buffer(nullable, null_bitmap, body);
fb_buffers.push(null_buf);
for &slice in data_slices {
let meta_buf = push_buffer_with_compression(body, slice, self.compression)?;
fb_buffers.push(meta_buf);
}
fb_field_nodes.push(fbm::FieldNode::new(n_rows as i64, n_nulls as i64));
Ok(())
}
}
impl<B> Stream for GTableStreamEncoder<B>
where
B: StreamBuffer + 'static + Unpin,
{
type Item = io::Result<B>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(frame) = this.out_frames.pop_front() {
Poll::Ready(Some(Ok(frame)))
} else if this.finished {
Poll::Ready(None)
} else {
this.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
#[inline(always)]
fn push_buffer<B: StreamBuffer>(body: &mut B, bytes: &[u8]) -> fbm::Buffer {
let offset = body.len();
const CHUNK_SIZE: usize = 1024 * 1024; if bytes.len() > CHUNK_SIZE {
for chunk in bytes.chunks(CHUNK_SIZE) {
body.extend_from_slice(chunk);
}
} else {
body.extend_from_slice(bytes);
}
let len = bytes.len();
let pad = align_to::<B>(len);
if pad != 0 {
body.extend_from_slice(&[0u8; 64][..pad]);
}
fbm::Buffer::new(offset as i64, len as i64)
}
#[inline(always)]
fn push_buffer_with_compression<B: StreamBuffer>(
body: &mut B,
bytes: &[u8],
compression: Compression,
) -> Result<fbm::Buffer, io::Error> {
let offset = body.len();
let original_len = bytes.len();
if compression == Compression::None {
return Ok(push_buffer(body, bytes));
}
let compressed = compress(bytes, compression)
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("Compression failed: {}", e)))?;
let uncompressed_len_bytes = (original_len as u64).to_le_bytes();
body.extend_from_slice(&uncompressed_len_bytes);
const CHUNK_SIZE: usize = 1024 * 1024; if compressed.len() > CHUNK_SIZE {
for chunk in compressed.chunks(CHUNK_SIZE) {
body.extend_from_slice(chunk);
}
} else {
body.extend_from_slice(&compressed);
}
let total_written = 8 + compressed.len();
let pad = align_to::<B>(total_written);
if pad != 0 {
body.extend_from_slice(&[0u8; 64][..pad]);
}
Ok(fbm::Buffer::new(offset as i64, original_len as i64))
}
#[inline(always)]
fn make_null_buffer<B: StreamBuffer>(
nullable: bool,
mask: Option<&Bitmask>,
body: &mut B,
) -> (usize, fbm::Buffer) {
if nullable {
if let Some(mask) = mask {
let n_nulls = mask.null_count();
let buf = push_buffer(body, mask.bits.as_slice());
(n_nulls, buf)
} else {
(0, fbm::Buffer::new(0, 0))
}
} else {
(0, fbm::Buffer::new(0, 0))
}
}
#[cfg(test)]
mod tests {
use std::fs::File as StdFile;
use std::io::{Read, Write};
use std::sync::Arc;
use minarrow::ffi::arrow_dtype::CategoricalIndexType;
use minarrow::{
Array, Bitmask, Buffer, CategoricalArray, Field, FieldArray, IntegerArray, Table,
TextArray, Vec64,
};
use tempfile::NamedTempFile;
use crate::constants::{ARROW_MAGIC_NUMBER, ARROW_MAGIC_NUMBER_PADDED};
use super::*;
fn make_bitmask(valid: &[bool]) -> Bitmask {
let mut bits = vec![0u8; (valid.len() + 7) / 8];
for (i, v) in valid.iter().enumerate() {
if *v {
bits[i / 8] |= 1 << (i % 8);
}
}
Bitmask {
bits: Buffer::from(Vec64::from_slice(&bits[..])),
len: valid.len(),
}
}
fn dict_strs() -> Vec<String> {
vec![
"apple".to_string(),
"banana".to_string(),
"pear".to_string(),
]
}
fn make_schema(idx_ty: CategoricalIndexType, nullable: bool) -> Vec<Field> {
vec![Field {
name: "col".to_string(),
dtype: ArrowType::Dictionary(idx_ty),
nullable,
metadata: Default::default(),
}]
}
fn make_table(arr: FieldArray, n_rows: usize) -> Table {
Table {
cols: vec![arr],
n_rows,
name: "tbl".to_string(),
}
}
fn read_file_bytes(path: &std::path::Path) -> Vec<u8> {
let mut f = StdFile::open(path).expect("file open");
let mut buf = Vec::new();
f.read_to_end(&mut buf).expect("file read");
buf
}
fn check_ipc_padding(buf: &[u8]) {
assert_eq!(buf.len() % 8, 0, "Arrow IPC frame should be 8-byte aligned");
}
#[test]
fn test_write_categorical_column_u32_to_file() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path().to_path_buf();
let mut writer = TableStreamEncoder::new(
make_schema(CategoricalIndexType::UInt32, true),
IPCMessageProtocol::Stream,
);
let arr = CategoricalArray {
data: Buffer::from(Vec64::from_slice(&[1u32, 0, 2, 1])),
unique_values: Vec64::from(dict_strs()),
null_mask: Some(make_bitmask(&[true, false, true, true])),
};
writer.register_dictionary(0, dict_strs());
let tbl = make_table(
FieldArray::new(
Field {
name: "col".to_string(),
dtype: ArrowType::Dictionary(CategoricalIndexType::UInt32),
nullable: true,
metadata: Default::default(),
},
Array::TextArray(TextArray::Categorical32(Arc::new(arr))),
),
4,
);
writer.write_record_batch_frame(&tbl).unwrap();
writer.finish().unwrap();
let mut file = StdFile::create(&path).unwrap();
for frame in writer.out_frames {
use std::io::Write;
file.write_all(&frame).unwrap();
}
file.flush().unwrap();
drop(file);
let buf = read_file_bytes(&path);
assert!(!buf.is_empty());
check_ipc_padding(&buf);
}
#[test]
fn test_ipc_file_write_read_dict() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path().to_path_buf();
let mut writer = TableStreamEncoder::new(
make_schema(CategoricalIndexType::UInt32, true),
IPCMessageProtocol::File,
);
let arr = CategoricalArray {
data: Buffer::from(Vec64::from_slice(&[0u32, 1, 1, 2])),
unique_values: Vec64::from(dict_strs()),
null_mask: Some(make_bitmask(&[true, false, true, true])),
};
writer.register_dictionary(0, dict_strs());
let tbl = make_table(
FieldArray::new(
Field {
name: "col".to_string(),
dtype: ArrowType::Dictionary(CategoricalIndexType::UInt32),
nullable: true,
metadata: Default::default(),
},
Array::TextArray(TextArray::Categorical32(Arc::new(arr))),
),
4,
);
writer.write_record_batch_frame(&tbl).unwrap();
writer.finish().unwrap();
let mut file = StdFile::create(&path).unwrap();
for frame in writer.out_frames {
use std::io::Write;
file.write_all(&frame).unwrap();
}
file.flush().unwrap();
drop(file);
let buf = read_file_bytes(&path);
println!("Written buffer:\n{:?}", buf);
assert!(
buf.starts_with(ARROW_MAGIC_NUMBER_PADDED),
"file must start with Arrow magic"
);
assert!(
buf.ends_with(ARROW_MAGIC_NUMBER),
"file must end with Arrow magic"
);
println!("Written buffer len : {:?}", buf.len());
assert!(
(buf.len() + 4 + 2) % 8 == 0,
"Arrow IPC file must be a multiple of 8 bytes"
);
}
#[test]
fn test_ipc_file_write_read_std() {
let temp = NamedTempFile::new().unwrap();
let path = temp.path().to_path_buf();
let schema = vec![Field {
name: "col".to_string(),
dtype: ArrowType::Int32,
nullable: true,
metadata: Default::default(),
}];
let mut writer = TableStreamEncoder64::new(schema.clone(), IPCMessageProtocol::File);
let data = vec![10i32, 20, 30, 40];
let mask = make_bitmask(&[true, false, true, true]);
let arr = NumericArray::Int32(Arc::new(IntegerArray {
data: Buffer::from(Vec64::from_slice(&data)),
null_mask: Some(mask),
}));
let tbl = make_table(
FieldArray::new(
Field {
name: "col".to_string(),
dtype: ArrowType::Int32,
nullable: true,
metadata: Default::default(),
},
Array::NumericArray(arr),
),
4,
);
writer.write_record_batch_frame(&tbl).unwrap();
writer.finish().unwrap();
let mut file = StdFile::create(&path).unwrap();
for frame in writer.out_frames {
use std::io::Write;
file.write_all(&frame).unwrap();
}
file.flush().unwrap();
drop(file);
let buf = read_file_bytes(&path);
println!("Written buffer:\n{:?}", buf);
assert!(
buf.starts_with(ARROW_MAGIC_NUMBER_PADDED),
"file must start with Arrow magic"
);
assert!(
buf.ends_with(ARROW_MAGIC_NUMBER),
"file must end with Arrow magic"
);
}
}