use arrow_zerobus_sdk_wrapper::wrapper::debug::DebugWriter;
use arrow_zerobus_sdk_wrapper::ZerobusError;
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
fn create_test_batch() -> RecordBatch {
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]);
let id_array = Int64Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(name_array)],
)
.unwrap()
}
fn create_large_batch(size_mb: usize) -> RecordBatch {
let num_rows = size_mb * 1024 * 1024 / 20; let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("data", DataType::Utf8, false),
]);
let ids: Vec<i64> = (0..num_rows).map(|i| i as i64).collect();
let data: Vec<String> = (0..num_rows)
.map(|i| format!("data_{}", i))
.collect();
let id_array = Int64Array::from(ids);
let data_array = StringArray::from(data);
RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(data_array)],
)
.unwrap()
}
#[tokio::test]
async fn test_arrow_file_rotation_when_size_exceeded() {
let temp_dir = TempDir::new().unwrap();
let debug_writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
Some(1024), )
.unwrap();
let batch = create_large_batch(1); debug_writer.write_arrow(&batch).await.unwrap();
debug_writer.flush().await.unwrap();
debug_writer.write_arrow(&batch).await.unwrap();
debug_writer.flush().await.unwrap();
let arrow_dir = temp_dir.path().join("zerobus/arrow");
let files: Vec<_> = std::fs::read_dir(&arrow_dir)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name())
.collect();
assert!(!files.is_empty(), "Expected at least one Arrow file");
let has_rotated = files.iter().any(|f| {
let name = f.to_string_lossy();
name.contains("_") && name.contains(".arrow")
});
assert!(files.len() >= 1);
}
#[tokio::test]
async fn test_protobuf_file_rotation_when_size_exceeded() {
let temp_dir = TempDir::new().unwrap();
let debug_writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
Some(1024), )
.unwrap();
let test_bytes = vec![0u8; 200]; for _ in 0..10 {
debug_writer.write_protobuf(&test_bytes, false).await.unwrap();
}
debug_writer.flush().await.unwrap();
let proto_dir = temp_dir.path().join("zerobus/proto");
let files: Vec<_> = std::fs::read_dir(&proto_dir)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name())
.collect();
assert!(!files.is_empty(), "Expected at least one Protobuf file");
}
#[tokio::test]
async fn test_no_rotation_when_size_not_exceeded() {
let temp_dir = TempDir::new().unwrap();
let debug_writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
Some(10 * 1024 * 1024), )
.unwrap();
let batch = create_test_batch();
for _ in 0..100 {
debug_writer.write_arrow(&batch).await.unwrap();
}
debug_writer.flush().await.unwrap();
let arrow_dir = temp_dir.path().join("zerobus/arrow");
let files: Vec<_> = std::fs::read_dir(&arrow_dir)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name())
.collect();
let arrow_files: Vec<_> = files
.iter()
.filter(|f| f.to_string_lossy().ends_with(".arrow"))
.collect();
assert_eq!(arrow_files.len(), 1, "Expected exactly one Arrow file (no rotation)");
}
#[tokio::test]
async fn test_rotation_exact_size_boundary() {
let temp_dir = TempDir::new().unwrap();
let max_size = 2048; let debug_writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
Some(max_size),
)
.unwrap();
let batch = create_test_batch();
for _ in 0..50 {
debug_writer.write_arrow(&batch).await.unwrap();
}
debug_writer.flush().await.unwrap();
let arrow_file = temp_dir.path().join("zerobus/arrow/test_table.arrow");
if arrow_file.exists() {
let metadata = std::fs::metadata(&arrow_file).unwrap();
let file_size = metadata.len();
debug_writer.write_arrow(&batch).await.unwrap();
debug_writer.flush().await.unwrap();
let arrow_dir = temp_dir.path().join("zerobus/arrow");
let files: Vec<_> = std::fs::read_dir(&arrow_dir)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name())
.collect();
if file_size >= max_size {
let rotated_files: Vec<_> = files
.iter()
.filter(|f| {
let name = f.to_string_lossy();
name.contains("_") && name.ends_with(".arrow")
})
.collect();
assert!(files.len() >= 1);
}
}
}
#[tokio::test]
async fn test_multiple_rotations() {
let temp_dir = TempDir::new().unwrap();
let debug_writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
Some(512), )
.unwrap();
let batch = create_large_batch(1);
for _ in 0..5 {
debug_writer.write_arrow(&batch).await.unwrap();
debug_writer.flush().await.unwrap();
}
let arrow_dir = temp_dir.path().join("zerobus/arrow");
let files: Vec<_> = std::fs::read_dir(&arrow_dir)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name())
.collect();
assert!(!files.is_empty(), "Expected at least one file");
for file in &files {
let file_path = arrow_dir.join(file);
assert!(file_path.exists(), "File should exist: {:?}", file);
}
}
#[tokio::test]
async fn test_rotation_with_no_max_size() {
let temp_dir = TempDir::new().unwrap();
let debug_writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
None, )
.unwrap();
let batch = create_large_batch(1);
for _ in 0..20 {
debug_writer.write_arrow(&batch).await.unwrap();
}
debug_writer.flush().await.unwrap();
let arrow_dir = temp_dir.path().join("zerobus/arrow");
let files: Vec<_> = std::fs::read_dir(&arrow_dir)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name())
.collect();
let arrow_files: Vec<_> = files
.iter()
.filter(|f| f.to_string_lossy().ends_with(".arrow"))
.collect();
assert_eq!(arrow_files.len(), 1, "Expected exactly one Arrow file (no rotation)");
}
#[tokio::test]
async fn test_rotation_file_naming() {
let temp_dir = TempDir::new().unwrap();
let debug_writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
Some(1024), )
.unwrap();
let batch = create_large_batch(1);
for _ in 0..5 {
debug_writer.write_arrow(&batch).await.unwrap();
debug_writer.flush().await.unwrap();
}
let arrow_dir = temp_dir.path().join("zerobus/arrow");
let files: Vec<_> = std::fs::read_dir(&arrow_dir)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name())
.collect();
for file in &files {
let name = file.to_string_lossy();
assert!(
name == "test_table.arrow" ||
(name.starts_with("test_table_") && name.ends_with(".arrow")),
"Unexpected file name: {}",
name
);
if name.starts_with("test_table_") && name != "test_table.arrow" {
let timestamp_part = name
.strip_prefix("test_table_")
.unwrap()
.strip_suffix(".arrow")
.unwrap();
assert_eq!(timestamp_part.len(), 15, "Timestamp should be 15 characters");
assert!(timestamp_part.chars().all(|c| c.is_ascii_digit() || c == '_'));
}
}
}