use arrow_zerobus_sdk_wrapper::wrapper::debug::DebugWriter;
use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::sleep;
#[tokio::test]
async fn test_debug_writer_new() {
let temp_dir = TempDir::new().unwrap();
let output_dir = temp_dir.path().to_path_buf();
let writer = DebugWriter::new(
output_dir.clone(),
"test_table".to_string(),
Duration::from_secs(5),
Some(1024 * 1024), Some(10),
);
assert!(writer.is_ok());
let arrow_dir = output_dir.join("zerobus/arrow");
let proto_dir = output_dir.join("zerobus/proto");
assert!(arrow_dir.exists());
assert!(proto_dir.exists());
}
#[tokio::test]
async fn test_debug_writer_new_invalid_directory() {
let invalid_path = PathBuf::from("/nonexistent/path/debug");
let writer = DebugWriter::new(
invalid_path,
"test_table".to_string(),
Duration::from_secs(5),
None,
Some(10),
);
let _ = writer;
}
#[tokio::test]
async fn test_debug_writer_write_arrow() {
let temp_dir = TempDir::new().unwrap();
let output_dir = temp_dir.path().to_path_buf();
let mut writer = DebugWriter::new(
output_dir.clone(),
"test_table".to_string(),
Duration::from_secs(5),
None,
Some(10),
).unwrap();
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]);
let id_array = Int64Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(name_array)],
).unwrap();
let result = writer.write_arrow(&batch).await;
assert!(result.is_ok());
let arrow_file = output_dir.join("zerobus/arrow/table.arrow");
writer.flush().await.unwrap();
sleep(Duration::from_millis(100)).await;
}
#[tokio::test]
async fn test_debug_writer_write_protobuf() {
let temp_dir = TempDir::new().unwrap();
let output_dir = temp_dir.path().to_path_buf();
let mut writer = DebugWriter::new(
output_dir.clone(),
"test_table".to_string(),
Duration::from_secs(5),
None,
Some(10),
).unwrap();
let protobuf_bytes = b"test protobuf data";
let result = writer.write_protobuf(protobuf_bytes).await;
assert!(result.is_ok());
writer.flush().await.unwrap();
sleep(Duration::from_millis(100)).await;
let proto_file = output_dir.join("zerobus/proto/table.proto");
}
#[tokio::test]
async fn test_debug_writer_flush() {
let temp_dir = TempDir::new().unwrap();
let output_dir = temp_dir.path().to_path_buf();
let mut writer = DebugWriter::new(
output_dir,
Duration::from_secs(5),
None,
).unwrap();
let result = writer.flush().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_debug_writer_should_flush() {
let temp_dir = TempDir::new().unwrap();
let output_dir = temp_dir.path().to_path_buf();
let writer = DebugWriter::new(
output_dir,
"test_table".to_string(),
Duration::from_millis(100), None,
Some(10),
).unwrap();
assert!(!writer.should_flush().await);
sleep(Duration::from_millis(150)).await;
assert!(writer.should_flush().await);
}
#[tokio::test]
async fn test_debug_writer_multiple_writes() {
let temp_dir = TempDir::new().unwrap();
let output_dir = temp_dir.path().to_path_buf();
let mut writer = DebugWriter::new(
output_dir.clone(),
"test_table".to_string(),
Duration::from_secs(5),
None,
Some(10),
).unwrap();
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let batch1 = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int64Array::from(vec![1, 2]))],
).unwrap();
let batch2 = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from(vec![3, 4]))],
).unwrap();
writer.write_arrow(&batch1).await.unwrap();
writer.write_arrow(&batch2).await.unwrap();
writer.write_protobuf(b"chunk1").await.unwrap();
writer.write_protobuf(b"chunk2").await.unwrap();
writer.flush().await.unwrap();
}
#[tokio::test]
async fn test_rotation_no_recursive_timestamps() {
use arrow_zerobus_sdk_wrapper::wrapper::debug::DebugWriter;
let temp_dir = TempDir::new().unwrap();
let base_path = temp_dir.path().join("test_table.arrows");
std::fs::File::create(&base_path).unwrap();
let writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
None,
Some(10),
).unwrap();
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from(vec![1; 1001]))], ).unwrap();
for _ in 0..2 {
writer.write_arrow(&batch).await.unwrap();
}
let entries: Vec<_> = std::fs::read_dir(temp_dir.path().join("zerobus/arrow"))
.unwrap()
.map(|e| e.unwrap().file_name().to_string_lossy().to_string())
.collect();
for entry in &entries {
let mut timestamp_count = 0;
let mut search_start = 0;
while let Some(pos) = entry[search_start..].find("_20") {
let actual_pos = search_start + pos;
if actual_pos + 3 + 8 + 1 + 6 <= entry.len() {
let date_part = &entry[actual_pos + 3..actual_pos + 3 + 8];
let separator = &entry[actual_pos + 3 + 8..actual_pos + 3 + 8 + 1];
let time_part = &entry[actual_pos + 3 + 8 + 1..actual_pos + 3 + 8 + 1 + 6];
if date_part.chars().all(|c| c.is_ascii_digit())
&& separator == "_"
&& time_part.chars().all(|c| c.is_ascii_digit())
{
timestamp_count += 1;
}
}
search_start = actual_pos + 3;
}
assert!(
timestamp_count <= 1,
"Filename should have at most one timestamp pattern, got {} in: {}",
timestamp_count,
entry
);
assert!(
entry == "test_table.arrows" || (entry.starts_with("test_table_20") && entry.ends_with(".arrows")),
"Unexpected filename format: {}",
entry
);
}
}
#[tokio::test]
async fn test_generate_rotated_path_with_existing_timestamp() {
use std::fs;
let temp_dir = TempDir::new().unwrap();
let arrow_dir = temp_dir.path().join("zerobus/arrow");
fs::create_dir_all(&arrow_dir).unwrap();
let existing_file = arrow_dir.join("test_table_20250101_120000.arrows");
fs::File::create(&existing_file).unwrap();
let writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
None,
Some(10),
).unwrap();
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from(vec![1; 1001]))],
).unwrap();
for _ in 0..2 {
writer.write_arrow(&batch).await.unwrap();
}
let entries: Vec<_> = fs::read_dir(&arrow_dir)
.unwrap()
.map(|e| e.unwrap().file_name().to_string_lossy().to_string())
.collect();
for entry in entries {
let parts: Vec<&str> = entry.split('_').collect();
let timestamp_like_parts = parts.iter()
.filter(|p| p.len() == 8 && p.chars().all(|c| c.is_ascii_digit())) .count();
assert!(timestamp_like_parts <= 1, "Should have at most one date part: {}", entry);
}
}
#[tokio::test]
async fn test_file_retention_cleanup() {
use std::fs;
let temp_dir = TempDir::new().unwrap();
let arrow_dir = temp_dir.path().join("zerobus/arrow");
fs::create_dir_all(&arrow_dir).unwrap();
for i in 0..12 {
let timestamp = format!("20250101_{:06}", i * 100); let file_path = arrow_dir.join(format!("test_table_{}.arrows", timestamp));
fs::File::create(&file_path).unwrap();
let time = std::time::SystemTime::now() - Duration::from_secs((12 - i) as u64);
let file_time = filetime::FileTime::from_system_time(time);
filetime::set_file_times(&file_path, file_time, file_time).unwrap();
}
let writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
None,
Some(10), ).unwrap();
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from(vec![1; 1001]))],
).unwrap();
writer.write_arrow(&batch).await.unwrap();
sleep(Duration::from_millis(100)).await;
let entries: Vec<_> = fs::read_dir(&arrow_dir)
.unwrap()
.map(|e| e.unwrap().file_name().to_string_lossy().to_string())
.collect();
assert!(entries.len() <= 11, "Should have at most 11 files after cleanup, got {}", entries.len());
}
#[tokio::test]
async fn test_file_retention_unlimited() {
use std::fs;
let temp_dir = TempDir::new().unwrap();
let arrow_dir = temp_dir.path().join("zerobus/arrow");
fs::create_dir_all(&arrow_dir).unwrap();
for i in 0..5 {
let timestamp = format!("20250101_{:06}", i * 100);
let file_path = arrow_dir.join(format!("test_table_{}.arrows", timestamp));
fs::File::create(&file_path).unwrap();
}
let writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"test_table".to_string(),
Duration::from_secs(5),
None,
None, ).unwrap();
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from(vec![1; 1001]))],
).unwrap();
writer.write_arrow(&batch).await.unwrap();
sleep(Duration::from_millis(100)).await;
let entries: Vec<_> = fs::read_dir(&arrow_dir)
.unwrap()
.map(|e| e.unwrap().file_name().to_string_lossy().to_string())
.collect();
assert!(entries.len() >= 6, "Should retain all files with unlimited retention");
}
#[tokio::test]
async fn test_sequential_naming_when_filename_too_long() {
let temp_dir = TempDir::new().unwrap();
let writer = DebugWriter::new(
temp_dir.path().to_path_buf(),
"a".repeat(200).to_string(), Duration::from_secs(5),
None,
Some(10),
).unwrap();
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from(vec![1; 1001]))],
).unwrap();
for _ in 0..3 {
writer.write_arrow(&batch).await.unwrap();
}
let arrow_dir = temp_dir.path().join("zerobus/arrow");
let entries: Vec<_> = std::fs::read_dir(&arrow_dir)
.unwrap()
.map(|e| e.unwrap().file_name().to_string_lossy().to_string())
.collect();
for entry in entries {
assert!(entry.len() < 300, "Filename should not exceed filesystem limits: {}", entry);
}
}