use crate::timon_engine::helpers::*;
use datafusion::arrow::array::{
Array, ArrayRef, BooleanArray, BooleanBuilder, Date32Array, Float64Array, Float64Builder, Int32Array, Int64Array, Int64Builder, ListBuilder,
StringArray, StringBuilder, StructArray, TimestampMillisecondArray, TimestampNanosecondArray,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use serde_json::json;
use std::fs;
use std::sync::Arc;
#[test]
fn test_record_batches_to_json() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let name_array = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![id_array, name_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(
result,
json!([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
])
);
}
#[test]
fn test_json_to_arrow() {
let json_data = vec![
json!({"id": 1, "name": "Alice", "score": 95.5}),
json!({"id": 2, "name": "Bob", "score": 88.0}),
];
let (arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 3);
assert_eq!(arrays.len(), 3);
assert_eq!(arrays[0].len(), 2);
}
#[test]
fn test_get_property_fields() {
let schema = json!({
"id": {"type": "int", "unique": true},
"name": {"type": "string"},
"email": {"type": "string", "unique": true},
"age": {"type": "int"}
});
let unique_fields = get_property_fields(&schema, "unique").unwrap();
assert_eq!(unique_fields, vec!["email", "id"]);
}
#[test]
fn test_filter_files_by_date_range() {
let files = vec![
"data_2023-01-01.parquet".to_string(),
"data_2023-01-15.parquet".to_string(),
"data_2023-02-01.parquet".to_string(),
];
let filtered = filter_files_by_date_range(files, "2023-01-01", "2023-01-31").unwrap();
assert_eq!(filtered, vec!["data_2023-01-01.parquet", "data_2023-01-15.parquet"]);
}
#[test]
fn test_combine_unique_batches() {
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Int64, false),
]);
let id_array1 = Arc::new(Int64Array::from(vec![1, 2]));
let value_array1 = Arc::new(Int64Array::from(vec![10, 20]));
let batch1 = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array1, value_array1]).unwrap();
let id_array2 = Arc::new(Int64Array::from(vec![2, 3]));
let value_array2 = Arc::new(Int64Array::from(vec![25, 30]));
let batch2 = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array2, value_array2]).unwrap();
let result = combine_unique_batches(vec![batch1], vec![batch2], &["id".to_string()]).unwrap();
assert_eq!(result.len(), 1);
let combined_batch = &result[0];
assert_eq!(combined_batch.num_rows(), 3);
}
#[test]
fn test_build_rules_tree() {
let schema = json!({
"age": {"type": "int", "min": 18, "max": 100},
"score": {"type": "float", "min": 0.0, "max": 100.0},
"name": {"type": "string"}
});
let rules = build_rules_tree(schema);
assert_eq!(rules.len(), 4); }
#[tokio::test]
async fn test_cleanup_old_files() {
let temp_dir = tempfile::tempdir().unwrap();
let file1 = temp_dir.path().join("data_2020-01-01.txt"); let file2 = temp_dir.path().join("data_2030-01-01.txt");
fs::write(&file1, "content1").unwrap();
fs::write(&file2, "content2").unwrap();
assert!(file1.exists());
assert!(file2.exists());
let files = vec![file1.clone(), file2.clone()];
cleanup_old_files(&files).await;
assert!(files.len() == 2); }
#[test]
fn test_json_to_arrow_with_mixed_types() {
let json_data = vec![
json!({"id": 1, "name": "Alice", "active": true, "score": 95.5}),
json!({"id": 2, "name": "Bob", "active": false, "score": 88.0}),
json!({"id": 3, "name": "Charlie", "active": true, "score": 92.5}),
];
let (arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 4);
assert_eq!(arrays.len(), 4);
assert_eq!(arrays[0].len(), 3);
}
#[test]
fn test_get_property_fields_with_empty_schema() {
let empty_schema = json!({});
let result = get_property_fields(&empty_schema, "unique");
assert!(result.is_ok());
assert_eq!(result.unwrap(), Vec::<String>::new());
}
#[test]
fn test_get_property_fields_with_nonexistent_property() {
let schema = json!({
"id": {"type": "int", "unique": true},
"name": {"type": "string"},
});
let result = get_property_fields(&schema, "nonexistent");
assert!(result.is_ok());
assert_eq!(result.unwrap(), Vec::<String>::new());
}
#[test]
fn test_combine_unique_batches_with_empty_batches() {
let empty_batches: Vec<RecordBatch> = vec![];
let result = combine_unique_batches(empty_batches.clone(), empty_batches.clone(), &["id".to_string()]);
assert!(result.is_err());
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![id_array]).unwrap();
let result = combine_unique_batches(vec![batch], empty_batches, &["id".to_string()]);
assert!(result.is_ok());
let combined = result.unwrap();
assert_eq!(combined.len(), 1);
assert_eq!(combined[0].num_rows(), 3);
}
#[test]
fn test_build_rules_tree_with_complex_schema() {
let complex_schema = json!({
"id": {"type": "int", "unique": true},
"name": {"type": "string"},
"age": {"type": "int", "rules": [
{"operator": "greater_than", "value": 18},
{"operator": "less_than", "value": 100}
]},
"email": {"type": "string", "unique": true, "rules": [
{"operator": "contains", "value": "@"}
]},
"score": {"type": "float", "rules": [
{"operator": "greater_than_or_equal", "value": 0.0},
{"operator": "less_than_or_equal", "value": 100.0}
]}
});
let rules = build_rules_tree(complex_schema);
assert!(rules.len() >= 0); }
#[test]
fn test_get_local_file_modified_time() {
let temp_file = tempfile::NamedTempFile::new().unwrap();
let file_path = temp_file.path().to_str().unwrap();
let result = get_local_file_modified_time(file_path);
assert!(result.is_some());
let result = get_local_file_modified_time("nonexistent_file.txt");
assert!(result.is_none());
}
#[test]
fn test_record_batches_to_json_edge_cases() {
let empty_batches: Vec<RecordBatch> = vec![];
let result = record_batches_to_json(&empty_batches);
assert!(result.is_ok());
let schema = Schema::new(vec![Field::new("id", DataType::Int64, true), Field::new("name", DataType::Utf8, true)]);
let id_array = Arc::new(Int64Array::from(vec![Some(1), None, Some(3)]));
let name_array = Arc::new(StringArray::from(vec![Some("a"), Some("b"), None]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![id_array, name_array]).unwrap();
let result = record_batches_to_json(&vec![batch]);
assert!(result.is_ok());
}
#[test]
fn test_json_to_arrow_complex_types() {
let json_data = vec![
json!({"id": 1, "name": "test1", "value": 10.5}),
json!({"id": 2, "name": "test2", "value": 20.0}),
];
let result = json_to_arrow(&json_data, None, None);
assert!(result.is_ok());
}
#[test]
fn test_rounded_timestamp_comprehensive() {
let base_timestamp = 1672531200;
let intervals = vec![1, 5, 15, 30, 60, 120, 240, 480, 1440, 10080, 43200];
for interval in intervals {
let result = rounded_timestamp(base_timestamp, interval);
assert!(!result.is_empty());
}
}
#[test]
fn test_get_property_fields_complex_schema() {
let complex_schema = json!({
"id": {"type": "int", "unique": true, "required": true},
"name": {"type": "string", "unique": false},
"email": {"type": "string", "unique": true, "required": true},
"age": {"type": "int", "unique": false},
"active": {"type": "bool", "unique": false}
});
let unique_fields = get_property_fields(&complex_schema, "unique").unwrap();
assert_eq!(unique_fields.len(), 2);
let required_fields = get_property_fields(&complex_schema, "required").unwrap();
assert_eq!(required_fields.len(), 2); }
#[test]
fn test_filter_files_by_date_range_comprehensive() {
let files = vec![
"data_2023-01-01.parquet".to_string(),
"data_2023-01-15.parquet".to_string(),
"data_2023-02-01.parquet".to_string(),
"data_2023-03-01.parquet".to_string(),
];
let result = filter_files_by_date_range(files.clone(), "2023-01-01", "2023-01-31").unwrap();
assert_eq!(result.len(), 2);
let result = filter_files_by_date_range(files.clone(), "2023-02-01", "2023-02-28").unwrap();
assert_eq!(result.len(), 1);
let result = filter_files_by_date_range(files, "2023-01-01", "2023-03-31").unwrap();
assert_eq!(result.len(), 4);
}
#[test]
fn test_get_local_file_modified_time_comprehensive() {
let temp_file = tempfile::NamedTempFile::new().unwrap();
let file_path = temp_file.path().to_str().unwrap();
let result = get_local_file_modified_time(file_path);
assert!(result.is_some());
let result = get_local_file_modified_time("nonexistent_file.txt");
assert!(result.is_none());
let temp_dir = tempfile::tempdir().unwrap();
let dir_path = temp_dir.path().to_str().unwrap();
let result = get_local_file_modified_time(dir_path);
assert!(result.is_some()); }
#[test]
fn test_combine_unique_batches_comprehensive() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)]);
let id_array1 = Arc::new(Int64Array::from(vec![1, 2, 3]));
let name_array1 = Arc::new(StringArray::from(vec!["a", "b", "c"]));
let local_batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array1, name_array1]).unwrap();
let id_array2 = Arc::new(Int64Array::from(vec![2, 3, 4]));
let name_array2 = Arc::new(StringArray::from(vec!["b", "c", "d"]));
let s3_batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array2, name_array2]).unwrap();
let result = combine_unique_batches(vec![local_batch], vec![s3_batch], &["id".to_string()]);
assert!(result.is_ok());
let combined = result.unwrap();
assert_eq!(combined.len(), 1);
assert_eq!(combined[0].num_rows(), 4); }
#[test]
fn test_build_rules_tree_comprehensive() {
let comprehensive_schema = json!({
"id": {"type": "int", "unique": true, "min": 1, "max": 1000},
"name": {"type": "string", "unique": true},
"age": {"type": "int", "min": 0, "max": 150},
"score": {"type": "float", "min": 0.0, "max": 100.0},
"email": {"type": "string", "unique": true},
"active": {"type": "bool"},
"tags": {"type": "string", "unique": false}
});
let rules = build_rules_tree(comprehensive_schema);
assert!(rules.len() >= 0); }
#[tokio::test]
async fn test_cleanup_old_files_comprehensive() {
let temp_dir = tempfile::tempdir().unwrap();
let files = vec![
temp_dir.path().join("data_2020-01-01.txt"), temp_dir.path().join("data_2023-01-01.txt"), temp_dir.path().join("data_2030-01-01.txt"), temp_dir.path().join("data_2022-12-31.txt"), ];
for file in &files {
fs::write(file, "content").unwrap();
}
for file in &files {
assert!(file.exists());
}
cleanup_old_files(&files).await;
assert_eq!(files.len(), 4);
}
#[test]
fn test_read_parquet_batches() {
let temp_file = tempfile::NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let mut batches = Vec::new();
let _ = read_parquet_batches(file_path, &mut batches);
}
#[test]
fn test_json_to_arrow_with_empty_data() {
let empty_data: Vec<serde_json::Value> = vec![];
let result = json_to_arrow(&empty_data, None, None);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_json_to_arrow_with_null_values() {
let data_with_nulls = vec![
json!({"id": 1, "name": "test", "value": 10.5}),
json!({"id": 2, "name": "test2", "value": 20.0}),
];
let result = json_to_arrow(&data_with_nulls, None, None);
assert!(result.is_ok());
}
#[test]
fn test_get_property_fields_with_nested_schema() {
let nested_schema = json!({
"user": {
"id": {"type": "int", "unique": true},
"profile": {
"name": {"type": "string", "unique": false},
"email": {"type": "string", "unique": true}
}
}
});
let unique_fields = get_property_fields(&nested_schema, "unique");
assert!(unique_fields.is_ok());
}
#[test]
fn test_get_local_file_modified_time_with_special_paths() {
let special_paths = vec![
"/tmp/test_file.txt",
"./relative/path/file.txt",
"../parent/file.txt",
"file_with_spaces.txt",
"file_with_unicode_测试.txt",
];
for path in special_paths {
let result = get_local_file_modified_time(path);
assert!(result.is_some() || result.is_none());
}
}
#[test]
fn test_combine_unique_batches_with_different_schemas() {
let schema1 = Schema::new(vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)]);
let schema2 = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
Field::new("extra", DataType::Utf8, false),
]);
let id_array1 = Arc::new(Int64Array::from(vec![1, 2]));
let name_array1 = Arc::new(StringArray::from(vec!["a", "b"]));
let batch1 = RecordBatch::try_new(Arc::new(schema1), vec![id_array1, name_array1]).unwrap();
let id_array2 = Arc::new(Int64Array::from(vec![2, 3]));
let name_array2 = Arc::new(StringArray::from(vec!["b", "c"]));
let extra_array2 = Arc::new(StringArray::from(vec!["x", "y"]));
let batch2 = RecordBatch::try_new(Arc::new(schema2), vec![id_array2, name_array2, extra_array2]).unwrap();
let result = combine_unique_batches(vec![batch1], vec![batch2], &["id".to_string()]);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_build_rules_tree_with_empty_rules() {
let empty_rules_schema = json!({
"id": {"type": "int"},
"name": {"type": "string"},
"age": {"type": "int", "rules": []}, });
let rules = build_rules_tree(empty_rules_schema);
assert!(rules.len() >= 0); }
#[tokio::test]
async fn test_cleanup_old_files_with_special_characters() {
let temp_dir = tempfile::tempdir().unwrap();
let files = vec![
temp_dir.path().join("data_2020-01-01.txt"),
temp_dir.path().join("data with spaces_2020-01-01.txt"),
temp_dir.path().join("data_with_unicode_测试_2020-01-01.txt"),
];
for file in &files {
fs::write(file, "content").unwrap();
}
cleanup_old_files(&files).await;
assert_eq!(files.len(), 3);
}
#[test]
fn test_read_parquet_batches_with_invalid_path() {
let invalid_path = std::path::Path::new("/nonexistent/file.parquet");
let mut batches = Vec::new();
let result = read_parquet_batches(invalid_path, &mut batches);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_rounded_timestamp_with_zero_interval() {
let timestamp = 1672531200;
let result = rounded_timestamp(timestamp, 1); assert!(!result.is_empty());
}
#[test]
fn test_get_local_file_modified_time_with_invalid_paths() {
let valid_paths = vec![
"/tmp", "/dev/null", ];
for path in valid_paths {
let result = get_local_file_modified_time(path);
assert!(result.is_some() || result.is_none());
}
}
#[test]
fn test_combine_unique_batches_with_empty_unique_fields() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2]));
let name_array = Arc::new(StringArray::from(vec!["a", "b"]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![id_array, name_array]).unwrap();
let result = combine_unique_batches(vec![batch.clone()], vec![batch], &[]);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_build_rules_tree_with_invalid_rules() {
let invalid_schemas = vec![
json!({"field": {"type": "int", "rules": "not an array"}}),
json!({"field": {"type": "int", "rules": [{"invalid": "rule"}]}}),
];
for schema in invalid_schemas {
let rules = build_rules_tree(schema);
assert!(rules.len() >= 0);
}
}
#[tokio::test]
async fn test_cleanup_old_files_with_invalid_dates() {
let temp_dir = tempfile::tempdir().unwrap();
let files = vec![
temp_dir.path().join("data_invalid_date.txt"), temp_dir.path().join("data_2023-13-01.txt"), ];
for file in &files {
fs::write(file, "content").unwrap();
}
cleanup_old_files(&files).await;
assert_eq!(files.len(), 2);
}
#[test]
fn test_read_parquet_batches_with_directory() {
let temp_dir = tempfile::tempdir().unwrap();
let dir_path = temp_dir.path();
let mut batches = Vec::new();
let result = read_parquet_batches(dir_path, &mut batches);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_record_batches_to_json_int32() {
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![id_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["id"], json!(1));
assert_eq!(result[1]["id"], json!(2));
assert_eq!(result[2]["id"], json!(3));
}
#[test]
fn test_record_batches_to_json_utf8view() {
let schema = Schema::new(vec![Field::new("name", DataType::Utf8, true)]);
let name_array = Arc::new(StringArray::from(vec![Some("Alice"), None, Some("Bob")]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![name_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["name"], json!("Alice"));
assert_eq!(result[1]["name"], json!(null));
assert_eq!(result[2]["name"], json!("Bob"));
}
#[test]
fn test_record_batches_to_json_timestamp_millisecond_with_timezone() {
let tz_str: Arc<str> = Arc::from("+05:00");
let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![1672531200000i64]));
let _schema = Schema::new(vec![Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, Some(tz_str.clone())),
false,
)]);
let schema_no_tz = Schema::new(vec![Field::new("timestamp", DataType::Timestamp(TimeUnit::Millisecond, None), false)]);
let batch = RecordBatch::try_new(Arc::new(schema_no_tz), vec![timestamp_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert!(result[0]["timestamp"].is_number());
}
#[test]
fn test_record_batches_to_json_timestamp_nanosecond_no_timezone() {
let schema = Schema::new(vec![Field::new("timestamp", DataType::Timestamp(TimeUnit::Nanosecond, None), false)]);
let timestamp_array = Arc::new(TimestampNanosecondArray::from(vec![1672531200000000000i64]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![timestamp_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert!(result[0]["timestamp"].is_string());
}
#[test]
fn test_record_batches_to_json_timestamp_nanosecond_with_timezone() {
let timestamp_array = Arc::new(TimestampNanosecondArray::from(vec![1672531200000000000i64]));
let schema = Schema::new(vec![Field::new("timestamp", DataType::Timestamp(TimeUnit::Nanosecond, None), false)]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![timestamp_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert!(result[0]["timestamp"].is_string());
}
#[test]
fn test_record_batches_to_json_date32() {
let schema = Schema::new(vec![Field::new("date", DataType::Date32, false)]);
let date_array = Arc::new(Date32Array::from(vec![0, 18628, 19000]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![date_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert!(result[0]["date"].is_string() || result[0]["date"].is_null());
assert!(result[1]["date"].is_string() || result[1]["date"].is_null());
}
#[test]
fn test_record_batches_to_json_list() {
let inner_field = Field::new("item", DataType::Int64, true);
let list_field = Field::new("list", DataType::List(Arc::new(inner_field)), false);
let schema = Schema::new(vec![list_field]);
let mut list_builder = ListBuilder::new(Int64Builder::new());
list_builder.values().append_value(1);
list_builder.values().append_value(2);
list_builder.append(true);
list_builder.values().append_value(3);
list_builder.values().append_value(4);
list_builder.values().append_value(5);
list_builder.append(true);
list_builder.values().append_value(6);
list_builder.append(true);
let list_array = Arc::new(list_builder.finish());
let batch = RecordBatch::try_new(Arc::new(schema), vec![list_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["list"], json!([1, 2]));
assert_eq!(result[1]["list"], json!([3, 4, 5]));
assert_eq!(result[2]["list"], json!([6]));
}
#[test]
fn test_record_batches_to_json_list_strings() {
let inner_field = Field::new("item", DataType::Utf8, true);
let list_field = Field::new("tags", DataType::List(Arc::new(inner_field)), false);
let schema = Schema::new(vec![list_field]);
let mut list_builder = ListBuilder::new(StringBuilder::new());
list_builder.values().append_value("a");
list_builder.values().append_value("b");
list_builder.append(true);
list_builder.values().append_value("c");
list_builder.append(true);
let list_array = Arc::new(list_builder.finish());
let batch = RecordBatch::try_new(Arc::new(schema), vec![list_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["tags"], json!(["a", "b"]));
assert_eq!(result[1]["tags"], json!(["c"]));
}
#[test]
fn test_record_batches_to_json_list_float64() {
let inner_field = Field::new("item", DataType::Float64, true);
let list_field = Field::new("scores", DataType::List(Arc::new(inner_field)), false);
let schema = Schema::new(vec![list_field]);
let mut list_builder = ListBuilder::new(Float64Builder::new());
list_builder.values().append_value(95.5);
list_builder.values().append_value(88.0);
list_builder.append(true);
list_builder.values().append_value(92.3);
list_builder.append(true);
let list_array = Arc::new(list_builder.finish());
let batch = RecordBatch::try_new(Arc::new(schema), vec![list_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["scores"], json!([95.5, 88.0]));
assert_eq!(result[1]["scores"], json!([92.3]));
}
#[test]
fn test_record_batches_to_json_list_boolean() {
let inner_field = Field::new("item", DataType::Boolean, true);
let list_field = Field::new("flags", DataType::List(Arc::new(inner_field)), false);
let schema = Schema::new(vec![list_field]);
let mut list_builder = ListBuilder::new(BooleanBuilder::new());
list_builder.values().append_value(true);
list_builder.values().append_value(false);
list_builder.append(true);
list_builder.values().append_value(true);
list_builder.append(true);
let list_array = Arc::new(list_builder.finish());
let batch = RecordBatch::try_new(Arc::new(schema), vec![list_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["flags"], json!([true, false]));
assert_eq!(result[1]["flags"], json!([true]));
}
#[test]
fn test_record_batches_to_json_list_other_types() {
let inner_field = Field::new("item", DataType::Int32, true);
let list_field = Field::new("values", DataType::List(Arc::new(inner_field)), false);
let schema = Schema::new(vec![list_field]);
use datafusion::arrow::array::Int32Builder;
let mut list_builder = ListBuilder::new(Int32Builder::new());
list_builder.values().append_value(1);
list_builder.append(true);
let list_array = Arc::new(list_builder.finish());
let batch = RecordBatch::try_new(Arc::new(schema), vec![list_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["values"], json!([]));
}
#[test]
fn test_record_batches_to_json_stringview_null() {
let _schema = Schema::new(vec![Field::new("name", DataType::Utf8, true)]);
let name_array = Arc::new(StringArray::from(vec![Some("Alice"), None, Some("Bob")]));
let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)])), vec![name_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["name"], json!("Alice"));
assert_eq!(result[1]["name"], json!(null));
assert_eq!(result[2]["name"], json!("Bob"));
}
#[test]
fn test_json_to_arrow_missing_list_values() {
let json_data = vec![
json!({"tags": ["a", "b"]}), json!({"tags": null}), json!({"tags": ["c"]}), ];
let result = json_to_arrow(&json_data, None, None);
assert!(result.is_ok());
if let Ok((_arrays, _schema)) = result {
}
}
#[test]
fn test_json_to_arrow_missing_int64_list() {
let json_data = vec![
json!({"numbers": [1, 2]}),
json!({"numbers": null}), json!({"numbers": [3]}),
];
let result = json_to_arrow(&json_data, None, None);
assert!(result.is_ok());
}
#[test]
fn test_json_to_arrow_missing_float64_list() {
let json_data = vec![
json!({"scores": [95.5, 88.0]}),
json!({"scores": null}), json!({"scores": [92.3]}),
];
let result = json_to_arrow(&json_data, None, None);
assert!(result.is_ok());
}
#[test]
fn test_json_to_arrow_missing_boolean_list() {
let json_data = vec![
json!({"flags": [true, false]}),
json!({"flags": null}), json!({"flags": [true]}),
];
let result = json_to_arrow(&json_data, None, None);
assert!(result.is_ok());
}
#[test]
fn test_json_to_arrow_missing_other_list_types() {
let json_data = vec![
json!({"items": [1, 2]}),
json!({"items": null}), json!({"items": [3]}),
];
let result = json_to_arrow(&json_data, None, None);
assert!(result.is_ok());
}
#[test]
fn test_record_batches_to_json_struct_null_check() {
let struct_fields = vec![Field::new("id", DataType::Int64, false)];
let struct_field = Field::new("person", DataType::Struct(struct_fields.clone().into()), true);
let schema = Schema::new(vec![struct_field]);
let id_array = Arc::new(Int64Array::from(vec![1, 2])) as Arc<dyn Array>;
let struct_array = Arc::new(
StructArray::try_new(
struct_fields.into(),
vec![id_array],
None, )
.unwrap(),
);
let batch = RecordBatch::try_new(Arc::new(schema), vec![struct_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert!(result[0]["person"].is_object());
assert_eq!(result[0]["person"]["id"], json!(1));
}
#[test]
fn test_combine_unique_batches_convert_schema_list() {
let schema1 = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array1 = Arc::new(Int64Array::from(vec![1, 2]));
let batch1 = RecordBatch::try_new(Arc::new(schema1), vec![id_array1]).unwrap();
let schema2 = Schema::new(vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)]);
let id_array2 = Arc::new(Int64Array::from(vec![3]));
let name_array2 = Arc::new(StringArray::from(vec!["test"]));
let batch2 = RecordBatch::try_new(Arc::new(schema2), vec![id_array2, name_array2]).unwrap();
let result = combine_unique_batches(vec![batch1], vec![batch2], &["id".to_string()]);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_combine_unique_batches_convert_schema_warning() {
let schema1 = Schema::new(vec![Field::new("value", DataType::Utf8, false)]);
let str_array = Arc::new(StringArray::from(vec!["test"]));
let batch1 = RecordBatch::try_new(Arc::new(schema1), vec![str_array]).unwrap();
let schema2 = Schema::new(vec![Field::new("value", DataType::Int64, false)]);
let int_array = Arc::new(Int64Array::from(vec![1]));
let batch2 = RecordBatch::try_new(Arc::new(schema2), vec![int_array]).unwrap();
let result = combine_unique_batches(vec![batch1], vec![batch2], &["value".to_string()]);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_combine_unique_batches_missing_column_null() {
let schema1 = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1]));
let batch1 = RecordBatch::try_new(Arc::new(schema1), vec![id_array]).unwrap();
let schema2 = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false), ]);
let id_array2 = Arc::new(Int64Array::from(vec![2]));
let name_array2 = Arc::new(StringArray::from(vec!["test"]));
let batch2 = RecordBatch::try_new(Arc::new(schema2), vec![id_array2, name_array2]).unwrap();
let result = combine_unique_batches(vec![batch1], vec![batch2], &["id".to_string()]);
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn test_cleanup_old_files_error_path() {
use std::path::PathBuf;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let old_file = temp_dir.path().join("data_2020-01-01.parquet");
std::fs::write(&old_file, "old data").unwrap();
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&old_file).unwrap().permissions();
perms.set_mode(0o000); std::fs::set_permissions(&old_file, perms).unwrap();
}
let files = vec![PathBuf::from(&old_file)];
cleanup_old_files(&files).await;
}
#[test]
fn test_build_rules_tree_float_min() {
let schema = json!({
"value": {"type": "float", "min": 0.0}
});
let rules = build_rules_tree(schema);
assert_eq!(rules.len(), 1); }
#[test]
fn test_build_rules_tree_float_max() {
let schema = json!({
"value": {"type": "float", "max": 100.0}
});
let rules = build_rules_tree(schema);
assert_eq!(rules.len(), 1); }
#[test]
fn test_build_rules_tree_float_both() {
let schema = json!({
"value": {"type": "float", "min": 0.0, "max": 100.0}
});
let rules = build_rules_tree(schema);
assert_eq!(rules.len(), 2); }
#[test]
fn test_json_to_arrow_unsupported_list_type() {
let json_data = vec![json!({"items": [1, 2]})];
let result = json_to_arrow(&json_data, None, None);
assert!(result.is_ok());
}
#[test]
fn test_combine_unique_batches_list_conversion() {
let schema1 = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array1 = Arc::new(Int64Array::from(vec![1, 2]));
let batch1 = RecordBatch::try_new(Arc::new(schema1), vec![id_array1]).unwrap();
let inner_field = Field::new("item", DataType::Int64, true);
let list_field = Field::new("id", DataType::List(Arc::new(inner_field)), false);
let schema2 = Schema::new(vec![list_field]);
let mut list_builder = ListBuilder::new(Int64Builder::new());
list_builder.values().append_value(3);
list_builder.append(true);
let list_array = Arc::new(list_builder.finish());
let batch2 = RecordBatch::try_new(Arc::new(schema2), vec![list_array]).unwrap();
let result = combine_unique_batches(vec![batch1], vec![batch2], &["id".to_string()]);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_read_parquet_batches_success_path() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let mut batches = Vec::new();
let result = read_parquet_batches(file_path, &mut batches);
assert!(result.is_err());
}
#[test]
fn test_record_batches_to_json_struct_with_null() {
let struct_fields = vec![Field::new("id", DataType::Int64, false)];
let struct_field = Field::new("person", DataType::Struct(struct_fields.clone().into()), true);
let schema = Schema::new(vec![struct_field]);
let id_array = Arc::new(Int64Array::from(vec![1])) as Arc<dyn Array>;
let struct_array = Arc::new(
StructArray::try_new(
struct_fields.into(),
vec![id_array],
None, )
.unwrap(),
);
let batch = RecordBatch::try_new(Arc::new(schema), vec![struct_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert!(result[0]["person"].is_object());
}
#[test]
fn test_record_batches_to_json_unsupported_datatype() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![id_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["id"], json!(1));
}
#[test]
fn test_record_batches_to_json_struct() {
let struct_fields = vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)];
let struct_field = Field::new("person", DataType::Struct(struct_fields.clone().into()), false);
let schema = Schema::new(vec![struct_field]);
let id_array = Arc::new(Int64Array::from(vec![1, 2])) as Arc<dyn Array>;
let name_array = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as Arc<dyn Array>;
let struct_array = Arc::new(
StructArray::try_new(
struct_fields.into(),
vec![id_array, name_array],
None, )
.unwrap(),
);
let batch = RecordBatch::try_new(Arc::new(schema), vec![struct_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["person"]["id"], json!(1));
assert_eq!(result[0]["person"]["name"], json!("Alice"));
assert_eq!(result[1]["person"]["id"], json!(2));
assert_eq!(result[1]["person"]["name"], json!("Bob"));
}
#[test]
fn test_record_batches_to_json_struct_null() {
let struct_fields = vec![Field::new("id", DataType::Int64, false)];
let struct_field = Field::new("person", DataType::Struct(struct_fields.clone().into()), true);
let schema = Schema::new(vec![struct_field]);
let id_array = Arc::new(Int64Array::from(vec![1])) as Arc<dyn Array>;
let struct_array = Arc::new(
StructArray::try_new(
struct_fields.into(),
vec![id_array],
None, )
.unwrap(),
);
let batch = RecordBatch::try_new(Arc::new(schema), vec![struct_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert!(result[0]["person"].is_object());
}
#[test]
fn test_record_batches_to_json_boolean() {
let schema = Schema::new(vec![Field::new("active", DataType::Boolean, false)]);
let bool_array = Arc::new(BooleanArray::from(vec![true, false, true]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![bool_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["active"], json!(true));
assert_eq!(result[1]["active"], json!(false));
assert_eq!(result[2]["active"], json!(true));
}
#[test]
fn test_record_batches_to_json_float64() {
let schema = Schema::new(vec![Field::new("score", DataType::Float64, false)]);
let float_array = Arc::new(Float64Array::from(vec![95.5, 88.0, 92.3]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![float_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["score"], json!(95.5));
assert_eq!(result[1]["score"], json!(88.0));
assert_eq!(result[2]["score"], json!(92.3));
}
#[test]
fn test_json_to_arrow_with_list_strings() {
let json_data = vec![
json!({"tags": ["a", "b", "c"]}),
json!({"tags": ["d", "e"]}),
json!({"tags": ["f"]}), ];
let (arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert_eq!(arrays.len(), 1);
assert_eq!(arrays[0].len(), 3);
}
#[test]
fn test_json_to_arrow_with_list_int64() {
let json_data = vec![
json!({"numbers": [1, 2, 3]}),
json!({"numbers": [4, 5]}),
json!({"numbers": [6]}), ];
let (arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert_eq!(arrays.len(), 1);
assert_eq!(arrays[0].len(), 3);
}
#[test]
fn test_json_to_arrow_with_list_float64() {
let json_data = vec![
json!({"scores": [95.5, 88.0]}),
json!({"scores": [92.3]}),
json!({"scores": [85.0]}), ];
let (arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert_eq!(arrays.len(), 1);
assert_eq!(arrays[0].len(), 3);
}
#[test]
fn test_json_to_arrow_with_list_boolean() {
let json_data = vec![
json!({"flags": [true, false, true]}),
json!({"flags": [false]}),
json!({"flags": [true]}), ];
let (arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert_eq!(arrays.len(), 1);
assert_eq!(arrays[0].len(), 3);
}
#[test]
fn test_json_to_arrow_with_empty_array() {
let json_data = vec![
json!({"items": [1]}), json!({"items": []}), json!({"items": [2]}), ];
let (arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert_eq!(arrays.len(), 1);
}
#[test]
fn test_json_to_arrow_with_missing_list_values() {
let json_data = vec![
json!({"tags": ["a", "b"]}),
json!({"tags": ["c"]}), json!({"tags": ["d", "e"]}), ];
let (arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert_eq!(arrays.len(), 1);
assert_eq!(arrays[0].len(), 3);
}
#[test]
fn test_json_to_arrow_type_promotion_int64_to_float64() {
let json_data = vec![
json!({"value": 1}), json!({"value": 2.5}), json!({"value": 3}), ];
let (_arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert!(matches!(schema.field(0).data_type(), DataType::Float64));
}
#[test]
fn test_json_to_arrow_type_promotion_float64_to_float64() {
let json_data = vec![
json!({"value": 1.5}), json!({"value": 2}), json!({"value": 3.7}), ];
let (_arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert!(matches!(schema.field(0).data_type(), DataType::Float64));
}
#[test]
fn test_json_to_arrow_with_null_values_in_lists() {
let json_data = vec![json!({"items": [1, 2, null]}), json!({"items": [3]})];
let (arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert_eq!(arrays.len(), 1);
}
#[test]
fn test_json_to_arrow_with_mixed_list_types() {
let json_data = vec![json!({"items": [1, 2]}), json!({"items": [3]})];
let (_arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert!(matches!(schema.field(0).data_type(), DataType::List(_)));
}
#[test]
fn test_json_to_arrow_resolve_conflict_same_type() {
let json_data = vec![
json!({"value": 1}), json!({"value": 2}), ];
let (_arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert!(matches!(schema.field(0).data_type(), DataType::Int64));
}
#[test]
fn test_json_to_arrow_resolve_conflict_different_type() {
let json_data = vec![
json!({"value": "string"}), json!({"value": 123}), ];
let (_arrays, schema) = json_to_arrow(&json_data, None, None).unwrap();
assert_eq!(schema.fields().len(), 1);
assert!(matches!(schema.field(0).data_type(), DataType::Int64 | DataType::Utf8));
}
#[test]
fn test_json_to_arrow_with_null_value() {
let json_data = vec![
json!({"items": [null, null]}), json!({"items": [1]}), ];
let result = json_to_arrow(&json_data, None, None);
let _ = result;
}
#[test]
fn test_json_to_arrow_with_unsupported_datatype() {
let json_data = vec![
json!({"value": json!({"nested": "object"})}), json!({"value": null}), ];
let result = json_to_arrow(&json_data, None, None);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_json_to_arrow_empty_array_first() {
let json_data = vec![
json!({"items": []}), json!({"items": [1]}), ];
let result = json_to_arrow(&json_data, None, None);
let _ = result;
}
#[test]
fn test_combine_unique_batches_schema_mismatch() {
let schema1 = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array1 = Arc::new(Int64Array::from(vec![1, 2]));
let batch1 = RecordBatch::try_new(Arc::new(schema1), vec![id_array1]).unwrap();
let schema2 = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false), ]);
let id_array2 = Arc::new(Int64Array::from(vec![3, 4]));
let name_array2 = Arc::new(StringArray::from(vec!["a", "b"]));
let batch2 = RecordBatch::try_new(Arc::new(schema2), vec![id_array2, name_array2]).unwrap();
let result = combine_unique_batches(vec![batch1], vec![batch2], &["id".to_string()]);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_combine_unique_batches_different_field_order() {
let schema1 = Schema::new(vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)]);
let id_array1 = Arc::new(Int64Array::from(vec![1]));
let name_array1 = Arc::new(StringArray::from(vec!["Alice"]));
let batch1 = RecordBatch::try_new(Arc::new(schema1), vec![id_array1, name_array1]).unwrap();
let schema2 = Schema::new(vec![
Field::new("name", DataType::Utf8, false), Field::new("id", DataType::Int64, false),
]);
let name_array2 = Arc::new(StringArray::from(vec!["Bob"]));
let id_array2 = Arc::new(Int64Array::from(vec![2]));
let batch2 = RecordBatch::try_new(Arc::new(schema2), vec![name_array2, id_array2]).unwrap();
let result = combine_unique_batches(vec![batch1], vec![batch2], &["id".to_string()]);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_convert_batch_schema_missing_column() {
let source_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2]));
let source_batch = RecordBatch::try_new(Arc::new(source_schema), vec![id_array]).unwrap();
let _target_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false), ]);
let result = crate::timon_engine::helpers::combine_unique_batches(vec![source_batch], vec![], &["id".to_string()]);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_get_property_fields_edge_cases() {
let schema = json!({
"id": {"type": "int", "unique": "yes"}, "name": {"type": "string", "unique": 1}, "email": {"type": "string", "unique": true} });
let result = get_property_fields(&schema, "unique").unwrap();
assert_eq!(result.len(), 1);
assert!(result.contains(&"email".to_string()));
}
#[test]
fn test_filter_files_by_date_range_with_paths() {
let files = vec![
"/data/2023/01/data_2023-01-15.parquet".to_string(),
"/data/2023/02/data_2023-02-20.parquet".to_string(),
"/data/2024/data_2024-01-01.parquet".to_string(),
];
let filtered = filter_files_by_date_range(files, "2023-01-01", "2023-12-31").unwrap();
assert_eq!(filtered.len(), 2);
}
#[test]
fn test_filter_files_by_date_range_year_only() {
let files = vec!["data_2023.parquet".to_string(), "data_2024.parquet".to_string()];
let filtered = filter_files_by_date_range(files, "2023-01-01", "2023-12-31").unwrap();
assert!(filtered.len() >= 1);
assert!(filtered.iter().any(|f| f.contains("2023")));
}
#[test]
fn test_filter_files_by_date_range_year_month() {
let files = vec![
"data_2023-01.parquet".to_string(),
"data_2023-02.parquet".to_string(),
"data_2024-01.parquet".to_string(),
];
let filtered = filter_files_by_date_range(files, "2023-01-01", "2023-01-31").unwrap();
assert!(filtered.len() >= 1);
}
#[test]
fn test_rounded_timestamp_all_intervals() {
let timestamp = 1672531200;
let monthly = rounded_timestamp(timestamp, 43200);
assert!(!monthly.is_empty());
let weekly = rounded_timestamp(timestamp, 10080);
assert!(!weekly.is_empty());
let daily = rounded_timestamp(timestamp, 1440);
assert!(!daily.is_empty());
let hourly = rounded_timestamp(timestamp, 60);
assert!(!hourly.is_empty());
let minute = rounded_timestamp(timestamp, 15);
assert!(!minute.is_empty());
}
#[test]
fn test_filter_files_by_date_range_none_day_case() {
let files = vec!["data_2023-01-15.parquet".to_string(), "data_2023-12-31.parquet".to_string()];
let filtered = filter_files_by_date_range(files, "2023-01-01", "2023-12-31").unwrap();
assert!(filtered.len() >= 2);
let files_with_different_formats = vec![
"data_2023.parquet".to_string(), "data_2023-01.parquet".to_string(), "data_2023-01-15.parquet".to_string(), ];
let filtered_all = filter_files_by_date_range(files_with_different_formats, "2023-01-01", "2023-12-31").unwrap();
assert_eq!(filtered_all.len(), 3); }
#[test]
fn test_convert_batch_schema_list_conversion_path() {
let inner_field = Field::new("item", DataType::Int64, true);
let list_field = Field::new("id", DataType::List(Arc::new(inner_field)), false);
let list_schema = Schema::new(vec![list_field.clone()]);
let mut list_builder = ListBuilder::new(Int64Builder::new());
list_builder.values().append_value(1);
list_builder.append(true);
list_builder.values().append_value(2);
list_builder.append(true);
let list_array = Arc::new(list_builder.finish());
let list_batch = RecordBatch::try_new(Arc::new(list_schema), vec![list_array]).unwrap();
let int_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![3, 4]));
let int_batch = RecordBatch::try_new(Arc::new(int_schema), vec![id_array]).unwrap();
let result = combine_unique_batches(vec![list_batch], vec![int_batch], &["id".to_string()]);
assert!(result.is_err());
}
#[test]
fn test_read_parquet_batches_error_paths() {
use tempfile::NamedTempFile;
let temp_file = NamedTempFile::new().unwrap();
let file_path = temp_file.path();
let mut batches = Vec::new();
let result = read_parquet_batches(file_path, &mut batches);
assert!(result.is_err());
let result = read_parquet_batches(std::path::Path::new("/nonexistent/file.parquet"), &mut batches);
assert!(result.is_err());
}
#[test]
fn test_record_batches_to_json_unsupported_datatype_warning() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![id_array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["id"], json!(1));
}
#[test]
fn test_row_to_json_parquet_types() {
use crate::timon_engine::helpers::row_to_json;
let _ = row_to_json;
}
#[test]
fn test_filter_files_date_range_edge() {
let files = vec!["data_2023-01-15.parquet".to_string()];
let _ = filter_files_by_date_range(files, "2023-01-01", "2023-12-31");
}
#[test]
fn test_record_batches_to_json_stringview_null_line45() {
use crate::timon_engine::helpers::record_batches_to_json;
use datafusion::arrow::array::StringViewBuilder;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;
let mut builder = StringViewBuilder::new();
builder.append_value("test");
builder.append_null();
builder.append_value("value");
let array = Arc::new(builder.finish()) as Arc<dyn datafusion::arrow::array::Array>;
let schema = Arc::new(Schema::new(vec![Field::new("str", DataType::Utf8View, true)]));
let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[0]["str"], json!("test"));
assert_eq!(result[1]["str"], json!(null)); assert_eq!(result[2]["str"], json!("value"));
}
#[test]
fn test_record_batches_to_json_struct_null_line129() {
use crate::timon_engine::helpers::record_batches_to_json;
use datafusion::arrow::array::{StringArray, StructArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;
let string_array = Arc::new(StringArray::from(vec!["value1", "value2"]));
let fields = vec![Field::new("field", DataType::Utf8, false)];
let columns = vec![string_array as Arc<dyn datafusion::arrow::array::Array>];
let null_buffer = datafusion::arrow::buffer::BooleanBuffer::from_iter(vec![true, false]);
let struct_with_null = StructArray::new(
fields.into_iter().map(Arc::new).collect(),
columns,
Some(datafusion::arrow::buffer::NullBuffer::new(null_buffer)),
);
let schema = Arc::new(Schema::new(vec![Field::new("struct", struct_with_null.data_type().clone(), true)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(struct_with_null) as Arc<dyn datafusion::arrow::array::Array>]).unwrap();
let result = record_batches_to_json(&[batch]).unwrap();
assert_eq!(result[1]["struct"], json!(null));
}
#[tokio::test]
async fn test_cleanup_old_files_error_line657() {
use crate::timon_engine::helpers::cleanup_old_files;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("data_2020-01-01.parquet");
std::fs::write(&file_path, b"test").unwrap();
drop(temp_dir);
let files = vec![file_path];
cleanup_old_files(&files).await;
}
#[tokio::test]
async fn test_row_to_json_parquet_field_paths_lines172_205() {
use crate::timon_engine::{create_database, create_table, init_timon, insert};
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap();
let _ = init_timon(db_root, 30, "parquet_user");
let _ = create_database("parquet_test_db");
let schema = r#"{"fields": [{"name": "id", "type": "int", "unique": true}, {"name": "name", "type": "string"}, {"name": "value", "type": "float"}, {"name": "date", "type": "string", "datetime": true}]}"#;
let _ = create_table("parquet_test_db", "parquet_table", schema);
let data = r#"[{"id": 1, "name": "test", "value": 10.5, "date": "2023-01-01 10:00:00"}]"#;
let insert_result = insert("parquet_test_db", "parquet_table", data);
assert!(insert_result.is_ok());
let data2 = r#"[{"id": 1, "name": "test2", "value": 20.5, "date": "2023-01-01 11:00:00"}]"#;
let insert_result2 = insert("parquet_test_db", "parquet_table", data2);
assert!(insert_result2.is_ok());
}
#[test]
fn test_convert_batch_schema_missing_column_line633() {
use crate::timon_engine::helpers::combine_unique_batches;
use datafusion::arrow::array::{Int64Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;
let full_schema = Schema::new(vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)]);
let id_array1 = Arc::new(Int64Array::from(vec![1]));
let name_array1 = Arc::new(StringArray::from(vec!["test1"]));
let full_batch = RecordBatch::try_new(Arc::new(full_schema), vec![id_array1, name_array1]).unwrap();
let partial_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array2 = Arc::new(Int64Array::from(vec![2]));
let partial_batch = RecordBatch::try_new(Arc::new(partial_schema), vec![id_array2]).unwrap();
let result = combine_unique_batches(vec![full_batch], vec![partial_batch], &["id".to_string()]);
if result.is_err() {
} else {
assert!(result.is_ok());
}
}
#[test]
fn test_record_batches_to_json_int32_downcast_error() {
let schema = Schema::new(vec![Field::new("value", DataType::Int32, false)]);
let array = Arc::new(Int32Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert Int32 array");
}
#[test]
fn test_record_batches_to_json_float64_downcast_error() {
let schema = Schema::new(vec![Field::new("value", DataType::Float64, false)]);
let array = Arc::new(Float64Array::from(vec![1.5, 2.5, 3.5]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert Float64 array");
}
#[test]
fn test_record_batches_to_json_string_downcast_error() {
let schema = Schema::new(vec![Field::new("name", DataType::Utf8, false)]);
let array = Arc::new(StringArray::from(vec!["Alice", "Bob"]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert String array");
}
#[test]
fn test_record_batches_to_json_boolean_downcast_error() {
let schema = Schema::new(vec![Field::new("active", DataType::Boolean, false)]);
let array = Arc::new(BooleanArray::from(vec![true, false, true]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert Boolean array");
}
#[test]
fn test_record_batches_to_json_timestamp_ms_downcast_error() {
let schema = Schema::new(vec![Field::new("time", DataType::Timestamp(TimeUnit::Millisecond, None), false)]);
let array = Arc::new(TimestampMillisecondArray::from(vec![1609459200000, 1609545600000]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert Timestamp array");
}
#[test]
fn test_record_batches_to_json_timestamp_ms_with_tz_downcast_error() {
let schema = Schema::new(vec![Field::new(
"time",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
false,
)]);
let array = Arc::new(TimestampMillisecondArray::from(vec![1609459200000, 1609545600000]).with_timezone("UTC"));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert Timestamp with timezone");
}
#[test]
fn test_record_batches_to_json_timestamp_ms_invalid_value() {
let schema = Schema::new(vec![Field::new(
"time",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
false,
)]);
let array = Arc::new(TimestampMillisecondArray::from(vec![1609459200000]).with_timezone("UTC"));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should handle timestamp conversion");
}
#[test]
fn test_record_batches_to_json_timestamp_ns_downcast_error() {
let schema = Schema::new(vec![Field::new("time", DataType::Timestamp(TimeUnit::Nanosecond, None), false)]);
let array = Arc::new(TimestampNanosecondArray::from(vec![1609459200000000000, 1609545600000000000]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert Timestamp nanosecond array");
}
#[test]
fn test_record_batches_to_json_timestamp_ns_with_tz_downcast_error() {
let schema = Schema::new(vec![Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
false,
)]);
let array = Arc::new(TimestampNanosecondArray::from(vec![1609459200000000000]).with_timezone("UTC"));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert Timestamp nanosecond with timezone");
}
#[test]
fn test_record_batches_to_json_list_downcast_error() {
let list_field = Field::new("item", DataType::Int64, true);
let schema = Schema::new(vec![Field::new("values", DataType::List(Arc::new(list_field.clone())), false)]);
let mut list_builder = ListBuilder::new(Int64Builder::new());
list_builder.append_value([Some(1), Some(2), Some(3)]);
list_builder.append_value([Some(4), Some(5)]);
let list_array = list_builder.finish();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert List array");
}
#[test]
fn test_record_batches_to_json_list_string_downcast_error() {
let list_field = Field::new("item", DataType::Utf8, true);
let schema = Schema::new(vec![Field::new("names", DataType::List(Arc::new(list_field.clone())), false)]);
let mut list_builder = ListBuilder::new(StringBuilder::new());
list_builder.append_value([Some("Alice"), Some("Bob")]);
list_builder.append_value([Some("Charlie")]);
let list_array = list_builder.finish();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert List of strings");
}
#[test]
fn test_record_batches_to_json_list_int64_downcast_error() {
let list_field = Field::new("item", DataType::Int64, true);
let schema = Schema::new(vec![Field::new("numbers", DataType::List(Arc::new(list_field.clone())), false)]);
let mut list_builder = ListBuilder::new(Int64Builder::new());
list_builder.append_value([Some(10), Some(20)]);
let list_array = list_builder.finish();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert List of Int64");
}
#[test]
fn test_record_batches_to_json_list_float64_downcast_error() {
let list_field = Field::new("item", DataType::Float64, true);
let schema = Schema::new(vec![Field::new("scores", DataType::List(Arc::new(list_field.clone())), false)]);
let mut list_builder = ListBuilder::new(Float64Builder::new());
list_builder.append_value([Some(1.5), Some(2.5)]);
let list_array = list_builder.finish();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert List of Float64");
}
#[test]
fn test_record_batches_to_json_list_boolean_downcast_error() {
let list_field = Field::new("item", DataType::Boolean, true);
let schema = Schema::new(vec![Field::new("flags", DataType::List(Arc::new(list_field.clone())), false)]);
let mut list_builder = ListBuilder::new(BooleanBuilder::new());
list_builder.append_value([Some(true), Some(false)]);
let list_array = list_builder.finish();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert List of Boolean");
}
#[test]
fn test_record_batches_to_json_struct_downcast_error() {
let struct_fields = vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)];
let schema = Schema::new(vec![Field::new("person", DataType::Struct(struct_fields.clone().into()), false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2]));
let name_array = Arc::new(StringArray::from(vec!["Alice", "Bob"]));
let struct_array = StructArray::from(vec![
(Arc::new(struct_fields[0].clone()), id_array as ArrayRef),
(Arc::new(struct_fields[1].clone()), name_array as ArrayRef),
]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(struct_array)]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert Struct array");
}
#[test]
fn test_record_batches_to_json_unsupported_type_handling() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let array = Arc::new(Int64Array::from(vec![1, 2]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should handle conversion");
}
#[test]
fn test_record_batches_to_json_conversion_error() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let array = Arc::new(Int64Array::from(vec![1, 2]));
let batch = RecordBatch::try_new(Arc::new(schema), vec![array]).unwrap();
let result = record_batches_to_json(&[batch]);
assert!(result.is_ok(), "Should successfully convert");
}
#[test]
fn test_combine_unique_batches_with_incompatible_schemas() {
let local_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, false),
]));
let local_batch = RecordBatch::try_new(
local_schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2])), Arc::new(Float64Array::from(vec![10.0, 20.0]))],
)
.unwrap();
let s3_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Utf8, false), ]));
let s3_batch = RecordBatch::try_new(
s3_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["test1", "test2"])),
],
)
.unwrap();
let result = combine_unique_batches(vec![local_batch], vec![s3_batch], &["id".to_string()]);
assert!(result.is_err(), "Combining batches with incompatible types should fail");
}
#[test]
fn test_combine_unique_batches_with_compatible_schemas() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
]));
let local_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2])), Arc::new(StringArray::from(vec!["Alice", "Bob"]))],
)
.unwrap();
let s3_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![2, 3])), Arc::new(StringArray::from(vec!["Bob_Updated", "Charlie"])),
],
)
.unwrap();
let result = combine_unique_batches(vec![local_batch], vec![s3_batch], &["id".to_string()]);
assert!(result.is_ok(), "Combining batches with compatible schemas should succeed");
let merged_batches = result.unwrap();
assert_eq!(merged_batches.len(), 1, "Should produce one merged batch");
assert_eq!(merged_batches[0].num_rows(), 3, "Should have 3 unique rows (1, 2, 3)");
}