use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use lightstream::compression::Compression;
use lightstream::enums::IPCMessageProtocol;
use lightstream::models::writers::ipc::table_writer::TableWriter;
use minarrow::ffi::arrow_dtype::ArrowType;
use minarrow::{Array, Buffer, Field, FieldArray, IntegerArray, NumericArray, Table, Vec64};
fn create_test_table() -> (Table, Vec<Field>) {
let n_rows = 1000;
let int_data: Vec64<i64> = (0..n_rows).map(|i| i as i64 * 2).collect();
let int_array = Array::NumericArray(NumericArray::Int64(Arc::new(IntegerArray {
data: Buffer::from(int_data),
null_mask: None,
})));
let int_field = Field {
name: "id".into(),
dtype: ArrowType::Int64,
nullable: false,
metadata: Default::default(),
};
let int2_data: Vec64<i32> = (0..n_rows).map(|i| (i as i32) % 100).collect();
let int2_array = Array::NumericArray(NumericArray::Int32(Arc::new(minarrow::IntegerArray {
data: Buffer::from(int2_data),
null_mask: None,
})));
let int2_field = Field {
name: "category".into(),
dtype: ArrowType::Int32,
nullable: false,
metadata: Default::default(),
};
let float_data: Vec64<f64> = (0..n_rows).map(|i| (i as f64) * 3.14159).collect();
let float_array = Array::NumericArray(NumericArray::Float64(Arc::new(minarrow::FloatArray {
data: Buffer::from(float_data),
null_mask: None,
})));
let float_field = Field {
name: "value".into(),
dtype: ArrowType::Float64,
nullable: false,
metadata: Default::default(),
};
let schema = vec![int_field.clone(), int2_field.clone(), float_field.clone()];
let table = Table {
name: "compression_integration_test".to_string(),
n_rows,
cols: vec![
FieldArray::new(int_field, int_array),
FieldArray::new(int2_field, int2_array),
FieldArray::new(float_field, float_array),
],
};
(table, schema)
}
#[tokio::test]
async fn test_compression_none_integration() {
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let (table, schema) = create_test_table();
{
let file = File::create(file_path).await.unwrap();
let mut writer = TableWriter::with_compression(
file,
schema.clone(),
IPCMessageProtocol::File,
Compression::None,
)
.unwrap();
writer.write_all_tables(vec![table.clone()]).await.unwrap();
}
let mut file = File::open(file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert!(!buf.is_empty());
assert!(buf.starts_with(b"ARROW1\0\0"));
assert!(buf.ends_with(b"ARROW1"));
println!("✓ Uncompressed file size: {} bytes", buf.len());
println!("✓ Compression::None integration test passed");
}
#[cfg(feature = "snappy")]
#[tokio::test]
async fn test_snappy_compression_integration() {
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let (table, schema) = create_test_table();
{
let file = File::create(file_path).await.unwrap();
let mut writer = TableWriter::with_compression(
file,
schema.clone(),
IPCMessageProtocol::File,
Compression::Snappy,
)
.unwrap();
writer.write_all_tables(vec![table.clone()]).await.unwrap();
}
let mut file = File::open(file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert!(!buf.is_empty());
assert!(buf.starts_with(b"ARROW1\0\0"));
assert!(buf.ends_with(b"ARROW1"));
println!("✓ Snappy compressed file size: {} bytes", buf.len());
println!("✓ Snappy compression integration test passed");
}
#[cfg(feature = "zstd")]
#[tokio::test]
async fn test_zstd_compression_integration() {
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let (table, schema) = create_test_table();
{
let file = File::create(file_path).await.unwrap();
let mut writer = TableWriter::with_compression(
file,
schema.clone(),
IPCMessageProtocol::File,
Compression::Zstd,
)
.unwrap();
writer.write_all_tables(vec![table.clone()]).await.unwrap();
}
let mut file = File::open(file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert!(!buf.is_empty());
assert!(buf.starts_with(b"ARROW1\0\0"));
assert!(buf.ends_with(b"ARROW1"));
println!("✓ Zstd compressed file size: {} bytes", buf.len());
println!("✓ Zstd compression integration test passed");
}
#[tokio::test]
async fn test_compression_size_comparison() {
let (table, schema) = create_test_table();
let temp_none = NamedTempFile::new().unwrap();
let _temp_snappy = NamedTempFile::new().unwrap();
let _temp_zstd = NamedTempFile::new().unwrap();
let mut sizes = Vec::new();
{
let file = File::create(temp_none.path()).await.unwrap();
let mut writer = TableWriter::with_compression(
file,
schema.clone(),
IPCMessageProtocol::File,
Compression::None,
)
.unwrap();
writer.write_all_tables(vec![table.clone()]).await.unwrap();
}
let none_size = std::fs::metadata(temp_none.path()).unwrap().len();
sizes.push(("None", none_size));
#[cfg(feature = "snappy")]
{
let file = File::create(_temp_snappy.path()).await.unwrap();
let mut writer = TableWriter::with_compression(
file,
schema.clone(),
IPCMessageProtocol::File,
Compression::Snappy,
)
.unwrap();
writer.write_all_tables(vec![table.clone()]).await.unwrap();
let snappy_size = std::fs::metadata(_temp_snappy.path()).unwrap().len();
sizes.push(("Snappy", snappy_size));
}
#[cfg(feature = "zstd")]
{
let file = File::create(_temp_zstd.path()).await.unwrap();
let mut writer = TableWriter::with_compression(
file,
schema.clone(),
IPCMessageProtocol::File,
Compression::Zstd,
)
.unwrap();
writer.write_all_tables(vec![table.clone()]).await.unwrap();
let zstd_size = std::fs::metadata(_temp_zstd.path()).unwrap().len();
sizes.push(("Zstd", zstd_size));
}
println!("Compression size comparison:");
for (name, size) in &sizes {
let ratio = if none_size > 0 {
*size as f64 / none_size as f64 * 100.0
} else {
100.0
};
println!(" {}: {} bytes ({:.1}% of uncompressed)", name, size, ratio);
}
assert!(none_size > 0);
println!("✓ Compression size comparison completed");
}
#[tokio::test]
async fn test_stream_protocol_with_compression() {
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let (table, schema) = create_test_table();
{
let file = File::create(file_path).await.unwrap();
let mut writer = TableWriter::with_compression(
file,
schema.clone(),
IPCMessageProtocol::Stream,
Compression::None,
)
.unwrap();
writer.write_all_tables(vec![table.clone()]).await.unwrap();
}
let mut file = File::open(file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert!(!buf.is_empty());
assert_eq!(&buf[..4], &[0xFF, 0xFF, 0xFF, 0xFF]);
println!("✓ Stream protocol with compression test passed");
}
#[tokio::test]
async fn test_multiple_tables_with_compression() {
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let (table1, schema) = create_test_table();
let (mut table2, _) = create_test_table();
table2.name = "second_table".to_string();
{
let file = File::create(file_path).await.unwrap();
let mut writer = TableWriter::with_compression(
file,
schema.clone(),
IPCMessageProtocol::File,
Compression::None,
)
.unwrap();
writer.write_all_tables(vec![table1, table2]).await.unwrap();
}
let mut file = File::open(file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert!(!buf.is_empty());
assert!(buf.starts_with(b"ARROW1\0\0"));
assert!(buf.ends_with(b"ARROW1"));
println!("✓ Multiple tables with compression test passed");
}