use std::io;
use crate::constants::{
ARROW_MAGIC_NUMBER, ARROW_MAGIC_NUMBER_PADDED, CONTINUATION_MARKER_LEN,
DEFAULT_FRAME_ALLOCATION_SIZE, EOS_MARKER_LEN, METADATA_SIZE_PREFIX,
};
use crate::enums::IPCMessageProtocol;
use crate::models::frames::ipc_message::IPCFrameMetadata;
use crate::traits::frame_encoder::FrameEncoder;
use crate::traits::stream_buffer::StreamBuffer;
use crate::utils::align_to;
pub struct IPCFrame<'a> {
pub meta: &'a [u8],
pub body: &'a [u8],
pub protocol: IPCMessageProtocol,
pub is_first: bool,
pub is_last: bool,
pub footer_bytes: Option<&'a [u8]>,
}
pub struct IPCFrameEncoder;
impl FrameEncoder for IPCFrameEncoder {
type Frame<'a> = IPCFrame<'a>;
type Metadata = IPCFrameMetadata;
fn encode<'a, B: StreamBuffer>(
global_offset: &mut usize,
frame: &Self::Frame<'a>,
) -> io::Result<(B, Self::Metadata)> {
let mut out = B::with_capacity(DEFAULT_FRAME_ALLOCATION_SIZE);
let mut ipc_frame_metadata = IPCFrameMetadata::default();
if frame.protocol == IPCMessageProtocol::File && frame.is_first {
out.extend_from_slice(ARROW_MAGIC_NUMBER_PADDED);
ipc_frame_metadata.magic_len = ARROW_MAGIC_NUMBER_PADDED.len();
*global_offset += ipc_frame_metadata.magic_len;
}
let write_msg_frame = !frame.meta.is_empty() || !frame.body.is_empty();
if write_msg_frame {
Self::append_message_frame(
global_offset,
&mut out,
frame.meta,
frame.body,
&mut ipc_frame_metadata,
);
};
if frame.protocol == IPCMessageProtocol::File && frame.is_last {
Self::append_eos_marker(global_offset, &mut out, &mut ipc_frame_metadata);
Self::append_file_footer(
global_offset,
&mut out,
frame
.footer_bytes
.expect("`is_last` must include footer bytes for IPCMessageProtocol::File."),
&mut ipc_frame_metadata,
);
ipc_frame_metadata.footer_len = frame
.footer_bytes
.expect("Expected footer bytes for last message")
.len();
}
if frame.protocol == IPCMessageProtocol::Stream && frame.is_last {
Self::append_eos_marker(global_offset, &mut out, &mut ipc_frame_metadata);
}
Ok((out, ipc_frame_metadata))
}
}
impl IPCFrameEncoder {
pub fn append_file_footer<B: StreamBuffer>(
global_offset: &mut usize,
out: &mut B,
footer_bytes: &[u8],
ipc_frame_meta: &mut IPCFrameMetadata,
) {
out.extend_from_slice(footer_bytes);
*global_offset += footer_bytes.len();
out.extend_from_slice(&(footer_bytes.len() as u32).to_le_bytes());
*global_offset += 4;
ipc_frame_meta.footer_len = out.len();
out.extend_from_slice(ARROW_MAGIC_NUMBER);
ipc_frame_meta.magic_len = ARROW_MAGIC_NUMBER.len();
*global_offset += ipc_frame_meta.magic_len;
}
fn append_eos_marker<B: StreamBuffer>(
global_offset: &mut usize,
out: &mut B,
ipc_frame_meta: &mut IPCFrameMetadata,
) {
out.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes()); out.extend_from_slice(&0u32.to_le_bytes()); ipc_frame_meta.eos_len = EOS_MARKER_LEN;
*global_offset += EOS_MARKER_LEN;
}
fn append_message_frame<B: StreamBuffer>(
global_offset: &mut usize,
out: &mut B,
meta: &[u8],
body: &[u8],
ipc_frame_meta: &mut IPCFrameMetadata,
) {
ipc_frame_meta.header_len = CONTINUATION_MARKER_LEN + METADATA_SIZE_PREFIX;
ipc_frame_meta.meta_len = meta.len();
ipc_frame_meta.body_len = body.len();
out.reserve(ipc_frame_meta.frame_len());
let cont_marker = &0xFFFF_FFFFu32.to_le_bytes();
out.extend_from_slice(cont_marker);
*global_offset += 4;
ipc_frame_meta.meta_pad = align_to::<B>(*global_offset as usize + 4 + meta.len());
let metadata_size =
&(ipc_frame_meta.meta_len as u32 + ipc_frame_meta.meta_pad as u32).to_le_bytes();
out.extend_from_slice(metadata_size);
*global_offset += 4;
out.extend_from_slice(meta);
*global_offset += meta.len();
if ipc_frame_meta.meta_pad != 0 {
out.extend_from_slice(&vec![0u8; ipc_frame_meta.meta_pad]);
*global_offset += ipc_frame_meta.meta_pad;
}
out.extend_from_slice(body);
*global_offset += body.len();
ipc_frame_meta.body_pad = align_to::<B>(*global_offset);
if ipc_frame_meta.body_pad != 0 {
out.extend_from_slice(&vec![0u8; ipc_frame_meta.body_pad]);
*global_offset += ipc_frame_meta.body_pad;
}
}
}
#[cfg(test)]
mod tests {
use minarrow::{Vec64, vec64};
use super::*;
use crate::enums::IPCMessageProtocol;
#[test]
fn test_ipc_frame_metadata_calculations() {
let mut metadata = IPCFrameMetadata::default();
metadata.header_len = 8;
metadata.meta_len = 120;
metadata.meta_pad = 8;
metadata.body_len = 16;
metadata.body_pad = 0;
assert_eq!(metadata.metadata_total_len(), 128);
assert_eq!(metadata.body_total_len(), 16);
assert_eq!(metadata.frame_len(), 152);
}
#[test]
fn test_empty_stream_frame() {
let frame = IPCFrame {
meta: &[],
body: &[],
protocol: IPCMessageProtocol::Stream,
is_first: false,
is_last: false,
footer_bytes: None,
};
let (out, metadata) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
assert_eq!(out.len(), 0);
assert_eq!(metadata.frame_len(), 0);
}
#[test]
fn test_stream_message_frame() {
let meta_buf = vec![0u8; 120];
let body_buf = vec![1u8; 16];
let frame = IPCFrame {
meta: &meta_buf,
body: &body_buf,
protocol: IPCMessageProtocol::Stream,
is_first: false,
is_last: false,
footer_bytes: None,
};
let (out, metadata) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
assert_eq!(&out.0[0..4], &0xFFFF_FFFFu32.to_le_bytes());
let meta_size = u32::from_le_bytes([out.0[4], out.0[5], out.0[6], out.0[7]]);
assert_eq!(meta_size, (metadata.meta_len + metadata.meta_pad) as u32);
assert_eq!(metadata.header_len, 8);
assert_eq!(metadata.meta_len, 120);
assert_eq!(metadata.meta_pad, 0); assert_eq!(metadata.body_len, 16);
assert_eq!(metadata.body_pad, 48);
assert_eq!(out.0.len(), 8 + 120 + 0 + 16 + 48); assert_eq!(out.0.len(), metadata.frame_len());
}
#[test]
fn test_file_first_frame_with_magic() {
let meta_buf = vec![0u8; 120];
let body_buf = vec![1u8; 16];
let frame = IPCFrame {
meta: &meta_buf,
body: &body_buf,
protocol: IPCMessageProtocol::File,
is_first: true,
is_last: false,
footer_bytes: None,
};
let (out, metadata) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
assert_eq!(&out.0[0..8], ARROW_MAGIC_NUMBER_PADDED);
assert_eq!(metadata.magic_len, 8);
assert_eq!(&out.0[8..12], &0xFFFF_FFFFu32.to_le_bytes());
assert_eq!(metadata.magic_len, 8);
}
#[test]
fn test_file_regular_frame() {
let meta_buf = vec![0u8; 120];
let body_buf = vec![1u8; 16];
let frame = IPCFrame {
meta: &meta_buf,
body: &body_buf,
protocol: IPCMessageProtocol::File,
is_first: false,
is_last: false,
footer_bytes: None,
};
let (file_out, metadata) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
assert_eq!(metadata.magic_len, 0);
assert_eq!(&file_out.0[0..4], &0xFFFF_FFFFu32.to_le_bytes());
let meta_buf = vec![0u8; 120];
let body_buf = vec![1u8; 16];
let frame = IPCFrame {
meta: &meta_buf,
body: &body_buf,
protocol: IPCMessageProtocol::Stream,
is_first: false,
is_last: false,
footer_bytes: None,
};
let (stream_out, _) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
assert_eq!(file_out.0.len(), stream_out.0.len());
}
#[test]
fn test_stream_eos_marker() {
let frame = IPCFrame {
meta: &[],
body: &[],
protocol: IPCMessageProtocol::Stream,
is_first: false,
is_last: true,
footer_bytes: None,
};
let (out, metadata) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
assert_eq!(out.0.len(), 8);
assert_eq!(&out.0[0..4], &0xFFFF_FFFFu32.to_le_bytes());
assert_eq!(&out.0[4..8], &0u32.to_le_bytes());
assert_eq!(metadata.eos_len, 8);
}
#[test]
fn test_file_footer() {
let footer = vec![2u8; 100];
let frame = IPCFrame {
meta: &[],
body: &[],
protocol: IPCMessageProtocol::File,
is_first: false,
is_last: true,
footer_bytes: Some(&footer),
};
let (out, _) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
let expected_len = 8 + 100 + 4 + 6;
assert_eq!(out.0.len(), expected_len);
assert_eq!(&out.0[0..4], &0xFFFF_FFFFu32.to_le_bytes());
assert_eq!(&out.0[4..8], &0u32.to_le_bytes());
assert_eq!(&out.0[8..108], &footer[..]);
let footer_len = u32::from_le_bytes([out.0[108], out.0[109], out.0[110], out.0[111]]);
assert_eq!(footer_len, 100);
assert_eq!(&out.0[112..118], ARROW_MAGIC_NUMBER);
}
#[test]
fn test_padding_calculations() {
let test_cases = vec![
(1, 7), (7, 1), (8, 0), (9, 7), (120, 0), (121, 7), ];
for (meta_size, expected_pad) in test_cases {
let meta = vec![0u8; meta_size];
let body = vec![1u8; 8];
let frame = IPCFrame {
meta: &meta,
body: &body,
protocol: IPCMessageProtocol::Stream,
is_first: false,
is_last: false,
footer_bytes: None,
};
let (out, metadata) = IPCFrameEncoder::encode::<Vec<u8>>(&mut 0, &frame).unwrap();
assert_eq!(
metadata.meta_pad, expected_pad,
"Failed for meta_size={}",
meta_size
);
let pad_start = 8 + meta_size; let pad_end = pad_start + expected_pad;
if expected_pad > 0 {
assert!(out[pad_start..pad_end].iter().all(|&b| b == 0));
}
}
}
#[test]
fn test_body_padding_vec() {
let meta = vec![0u8; 8];
let test_cases = vec![
(1, 7), (8, 0), (15, 1), (16, 0), ];
for (body_size, expected_pad) in test_cases {
let body = vec![1u8; body_size];
let frame = IPCFrame {
meta: &meta,
body: &body,
protocol: IPCMessageProtocol::Stream,
is_first: false,
is_last: false,
footer_bytes: None,
};
let (_out, metadata) = IPCFrameEncoder::encode::<Vec<u8>>(&mut 0, &frame).unwrap();
assert_eq!(
metadata.body_pad, expected_pad,
"Failed for body_size={}",
body_size
);
}
}
#[test]
fn test_body_padding_vec64() {
let meta = vec64![0u8; 8];
let test_cases = vec![
(1, 63), (8, 56), (15, 49), (16, 48), ];
for (body_size, expected_pad) in test_cases {
let body = vec![1u8; body_size];
let frame = IPCFrame {
meta: &meta,
body: &body,
protocol: IPCMessageProtocol::Stream,
is_first: false,
is_last: false,
footer_bytes: None,
};
let (_out, metadata) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
assert_eq!(
metadata.body_pad, expected_pad,
"Failed for body_size={}",
body_size
);
}
}
#[test]
fn test_metadata_size_field() {
let meta = vec![0u8; 120];
let body = vec![1u8; 16];
let frame = IPCFrame {
meta: &meta,
body: &body,
protocol: IPCMessageProtocol::Stream,
is_first: false,
is_last: false,
footer_bytes: None,
};
let (out, metadata) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
let meta_size = u32::from_le_bytes([out.0[4], out.0[5], out.0[6], out.0[7]]);
let expected_meta_pad = out.len()
- metadata.body_total_len()
- metadata.footer_eos_len()
- metadata.magic_len
- metadata.header_len;
assert_eq!(meta_size, (metadata.meta_len + metadata.meta_pad) as u32);
assert_eq!(meta_size, expected_meta_pad as u32);
}
#[test]
#[should_panic(expected = "`is_last` must include footer bytes for IPCMessageProtocol::File.")]
fn test_file_last_without_footer_panics() {
let frame = IPCFrame {
meta: &[],
body: &[],
protocol: IPCMessageProtocol::File,
is_first: false,
is_last: true,
footer_bytes: None,
};
let _ = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame).unwrap();
}
#[test]
fn test_full_file_sequence() {
let schema_meta = vec![0u8; 120];
let frame1 = IPCFrame {
meta: &schema_meta,
body: &[],
protocol: IPCMessageProtocol::File,
is_first: true,
is_last: false,
footer_bytes: None,
};
let (_out1, meta1) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame1).unwrap();
assert_eq!(meta1.magic_len, 8);
let batch_meta = vec![0u8; 136];
let batch_body = vec![1u8; 16];
let frame2 = IPCFrame {
meta: &batch_meta,
body: &batch_body,
protocol: IPCMessageProtocol::File,
is_first: false,
is_last: false,
footer_bytes: None,
};
let (_out2, meta2) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame2).unwrap();
assert_eq!(meta2.magic_len, 0);
assert_eq!(meta2.body_len, 16);
let footer_data = vec![2u8; 88];
let frame3 = IPCFrame {
meta: &[],
body: &[],
protocol: IPCMessageProtocol::File,
is_first: false,
is_last: true,
footer_bytes: Some(&footer_data),
};
let (_out3, meta3) = IPCFrameEncoder::encode::<Vec64<u8>>(&mut 0, &frame3).unwrap();
assert_eq!(meta3.eos_len, 8);
assert_eq!(meta3.footer_len, 88);
assert!(_out3.0.ends_with(ARROW_MAGIC_NUMBER));
}
}