use std::io::{Read, Seek, SeekFrom};
use std::{collections::VecDeque, convert::TryInto};
use crate::buffer::Buffer;
use crate::error::{ArrowError, Result};
use crate::{bitmap::Bitmap, types::NativeType};
use super::super::compression;
use super::super::endianess::is_native_little_endian;
use super::{Compression, IpcBuffer, Node};
fn read_swapped<T: NativeType, R: Read + Seek>(
reader: &mut R,
length: usize,
buffer: &mut Vec<T>,
is_little_endian: bool,
) -> Result<()> {
let mut slice = vec![0u8; length * std::mem::size_of::<T>()];
reader.read_exact(&mut slice)?;
let chunks = slice.chunks_exact(std::mem::size_of::<T>());
if !is_little_endian {
buffer
.as_mut_slice()
.iter_mut()
.zip(chunks)
.try_for_each(|(slot, chunk)| {
let a: T::Bytes = match chunk.try_into() {
Ok(a) => a,
Err(_) => unreachable!(),
};
*slot = T::from_be_bytes(a);
Result::Ok(())
})?;
} else {
return Err(ArrowError::NotYetImplemented(
"Reading little endian files from big endian machines".to_string(),
));
}
Ok(())
}
fn read_uncompressed_buffer<T: NativeType, R: Read + Seek>(
reader: &mut R,
buffer_length: usize,
length: usize,
is_little_endian: bool,
) -> Result<Vec<T>> {
let bytes = length * std::mem::size_of::<T>();
if bytes > buffer_length {
return Err(ArrowError::OutOfSpec(
format!("The slots of the array times the physical size must \
be smaller or equal to the length of the IPC buffer. \
However, this array reports {} slots, which, for physical type \"{}\", corresponds to {} bytes, \
which is larger than the buffer length {}",
length,
std::any::type_name::<T>(),
bytes,
buffer_length,
),
));
}
let mut buffer = vec![T::default(); length];
if is_native_little_endian() == is_little_endian {
let slice = bytemuck::cast_slice_mut(&mut buffer);
reader.read_exact(slice)?;
} else {
read_swapped(reader, length, &mut buffer, is_little_endian)?;
}
Ok(buffer)
}
fn read_compressed_buffer<T: NativeType, R: Read + Seek>(
reader: &mut R,
buffer_length: usize,
length: usize,
is_little_endian: bool,
compression: Compression,
) -> Result<Vec<T>> {
if is_little_endian != is_native_little_endian() {
return Err(ArrowError::NotYetImplemented(
"Reading compressed and big endian IPC".to_string(),
));
}
let mut buffer = vec![T::default(); length];
let mut slice = vec![0u8; buffer_length];
reader.read_exact(&mut slice)?;
let out_slice = bytemuck::cast_slice_mut(&mut buffer);
match compression.codec()? {
arrow_format::ipc::CompressionType::Lz4Frame => {
compression::decompress_lz4(&slice[8..], out_slice)?;
Ok(buffer)
}
arrow_format::ipc::CompressionType::Zstd => {
compression::decompress_zstd(&slice[8..], out_slice)?;
Ok(buffer)
}
}
}
pub fn read_buffer<T: NativeType, R: Read + Seek>(
buf: &mut VecDeque<IpcBuffer>,
length: usize, reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
) -> Result<Buffer<T>> {
let buf = buf
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: unable to fetch a buffer. The file is corrupted."))?;
reader.seek(SeekFrom::Start(block_offset + buf.offset() as u64))?;
let buffer_length = buf.length() as usize;
if let Some(compression) = compression {
Ok(
read_compressed_buffer(reader, buffer_length, length, is_little_endian, compression)?
.into(),
)
} else {
Ok(read_uncompressed_buffer(reader, buffer_length, length, is_little_endian)?.into())
}
}
fn read_uncompressed_bitmap<R: Read + Seek>(
length: usize,
bytes: usize,
reader: &mut R,
) -> Result<Vec<u8>> {
if length > bytes * 8 {
return Err(ArrowError::OutOfSpec(format!(
"An array requires a bitmap with at least the same number of bits as slots. \
However, this array reports {} slots but the the bitmap in IPC only contains \
{} bits",
length,
bytes * 8,
)));
}
let mut buffer = vec![0; bytes];
reader.read_exact(buffer.as_mut_slice())?;
Ok(buffer)
}
fn read_compressed_bitmap<R: Read + Seek>(
length: usize,
bytes: usize,
compression: Compression,
reader: &mut R,
) -> Result<Vec<u8>> {
let mut buffer = vec![0; (length + 7) / 8];
let mut slice = vec![0u8; bytes];
reader.read_exact(&mut slice)?;
match compression.codec()? {
arrow_format::ipc::CompressionType::Lz4Frame => {
compression::decompress_lz4(&slice[8..], &mut buffer)?;
Ok(buffer)
}
arrow_format::ipc::CompressionType::Zstd => {
compression::decompress_zstd(&slice[8..], &mut buffer)?;
Ok(buffer)
}
}
}
pub fn read_bitmap<R: Read + Seek>(
buf: &mut VecDeque<IpcBuffer>,
length: usize,
reader: &mut R,
block_offset: u64,
_: bool,
compression: Option<Compression>,
) -> Result<Bitmap> {
let buf = buf
.pop_front()
.ok_or_else(|| ArrowError::oos("IPC: unable to fetch a buffer. The file is corrupted."))?;
reader.seek(SeekFrom::Start(block_offset + buf.offset() as u64))?;
let bytes = buf.length() as usize;
let buffer = if let Some(compression) = compression {
read_compressed_bitmap(length, bytes, compression, reader)
} else {
read_uncompressed_bitmap(length, bytes, reader)
}?;
Ok(Bitmap::from_bytes(buffer.into(), length))
}
pub fn read_validity<R: Read + Seek>(
buffers: &mut VecDeque<IpcBuffer>,
field_node: Node,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
) -> Result<Option<Bitmap>> {
Ok(if field_node.null_count() > 0 {
Some(read_bitmap(
buffers,
field_node.length() as usize,
reader,
block_offset,
is_little_endian,
compression,
)?)
} else {
let _ = buffers.pop_front().ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch a buffer. The file is corrupted.")
})?;
None
})
}