use super::super::super::timon_engine::db_manager::{DataFusionOutput, DatabaseManager};
use chrono;
use chrono::Datelike;
use serde_json::json;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tempfile::TempDir;
use tokio::runtime::Runtime;
fn create_temp_dir() -> PathBuf {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let temp_dir = std::env::temp_dir().join(format!("test_db_{}", timestamp));
fs::create_dir_all(&temp_dir).expect("Failed to create temp directory");
temp_dir
}
fn cleanup_temp_dir(path: PathBuf) {
fs::remove_dir_all(path).expect("Failed to clean up temp directory");
}
#[test]
fn test_create_and_list_databases() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "ahmed_test");
assert!(db_manager.create_database("test_db").is_ok());
let databases = db_manager.list_databases().unwrap();
assert_eq!(databases, vec!["test_db".to_string()]);
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_create_and_list_tables() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "ahmed_test");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"id": {"type": "int", "required": true},
"name": {"type": "string", "required": true}
});
assert!(db_manager.create_table("test_db", "test_table", &schema.to_string()).is_ok());
let tables = db_manager.list_tables("test_db").unwrap();
assert_eq!(tables, vec!["test_table".to_string()]);
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_insert_and_query_data() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "ahmed_test");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"id": {"type": "int", "required": true},
"name": {"type": "string", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let data = json!([
{"date": "2025.02.20 10:15:00", "id": 1, "name": "Alice"},
{"date": "2025.02.20 10:20:00", "id": 2, "name": "Bob"}
]);
assert!(db_manager.insert("test_db", "test_table", &data.to_string()).is_ok());
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
assert_eq!(json_result.as_array().unwrap().len(), 2);
let first = &json_result[0];
assert_eq!(first["date"], 1740046500);
assert_eq!(first["id"], 1);
assert_eq!(first["name"], "Alice");
let second = &json_result[1];
assert_eq!(second["date"], 1740046800);
assert_eq!(second["id"], 2);
assert_eq!(second["name"], "Bob");
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_schema_validation() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "ahmed_test");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"id": {"type": "int", "required": true},
"name": {"type": "string", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let valid_data = json!([{"date": "2025.02.20 10:15:00", "id": 1, "name": "Alice"}]);
assert!(db_manager.insert("test_db", "test_table", &valid_data.to_string()).is_ok());
let invalid_data = json!([{"date": "2025.02.20 10:15:00", "id": 1}]);
assert!(db_manager.insert("test_db", "test_table", &invalid_data.to_string()).is_err());
let invalid_data = json!([{"date": "2025.02.20 10:15:00", "id": "not_an_int", "name": "Alice"}]);
assert!(db_manager.insert("test_db", "test_table", &invalid_data.to_string()).is_err());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_database_and_table() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "ahmed_test");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"id": {"type": "int", "required": true},
"name": {"type": "string", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
assert!(db_manager.delete_table("test_db", "test_table").is_ok());
assert!(db_manager.delete_database("test_db").is_ok());
let databases = db_manager.list_databases().unwrap();
assert!(databases.is_empty());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_simple_join_query_without_alias() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "ahmed_test");
db_manager.create_database("test_db").unwrap();
let schema1 = json!({
"id": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
let schema2 = json!({
"id": {"type": "int", "required": true, "unique": true, "datetime": true},
"desc": {"type": "string", "required": true}
});
db_manager.create_table("test_db", "table1", &schema1.to_string()).unwrap();
db_manager.create_table("test_db", "table2", &schema2.to_string()).unwrap();
let data1 = json!([
{"id": "2025.02.20 10:15:00", "value": 10},
{"id": "2025.02.20 10:20:00", "value": 20}
]);
let data2 = json!([
{"id": "2025.02.20 10:15:00", "desc": "foo"},
{"id": "2025.02.20 10:25:00", "desc": "bar"}
]);
db_manager.insert("test_db", "table1", &data1.to_string()).unwrap();
db_manager.insert("test_db", "table2", &data2.to_string()).unwrap();
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query(
"test_db",
"SELECT table1.id, table1.value, table2.desc FROM table1 JOIN table2 ON table1.id = table2.id",
None,
true,
None,
))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
assert_eq!(
json_result,
json!([
{"id": 1740046500, "value": 10, "desc": "foo"}
])
);
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_join_query_with_alias_and_group_by() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "ahmed_test");
db_manager.create_database("test_db").unwrap();
let schema1 = json!({
"id": {"type": "int", "required": true, "unique": true, "datetime": true},
"cat": {"type": "string", "required": true},
"value": {"type": "int", "required": true}
});
let schema2 = json!({
"id": {"type": "int", "required": true, "unique": true, "datetime": true},
"cat": {"type": "string", "required": true},
"score": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "t1", &schema1.to_string()).unwrap();
db_manager.create_table("test_db", "t2", &schema2.to_string()).unwrap();
let data1 = json!([
{"id": "2025.02.20 10:15:00", "cat": "A", "value": 10},
{"id": "2025.02.20 10:20:00", "cat": "B", "value": 20}
]);
let data2 = json!([
{"id": "2025.02.20 10:15:00", "cat": "A", "score": 100},
{"id": "2025.02.20 10:20:00", "cat": "B", "score": 200}
]);
db_manager.insert("test_db", "t1", &data1.to_string()).unwrap();
db_manager.insert("test_db", "t2", &data2.to_string()).unwrap();
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query(
"test_db",
"SELECT a.cat, AVG(a.value) as avg_value, AVG(b.score) as avg_score \
FROM t1 a JOIN t2 b ON a.id = b.id GROUP BY a.cat",
None,
true,
None,
))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
assert!(json_result.as_array().unwrap().iter().any(|row| row["cat"] == "A"));
assert!(json_result.as_array().unwrap().iter().any(|row| row["cat"] == "B"));
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
fn insert_activitydetails(db_manager: &mut DatabaseManager) {
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"user": {"type": "string", "required": true}
});
db_manager.create_table("test_db", "activitydetails", &schema.to_string()).unwrap();
let data = json!([
{"date": "2025.05.07 10:00:00", "user": "A"}, {"date": "2025.05.07 12:00:00", "user": "B"}, {"date": "2025.05.08 10:00:00", "user": "C"}, {"date": "2025.05.09 10:00:00", "user": "D"} ]);
db_manager.insert("test_db", "activitydetails", &data.to_string()).unwrap();
}
#[test]
fn test_date_range_query_hourly_bucket() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 60, "ahmed_test");
db_manager.create_database("test_db").unwrap();
insert_activitydetails(&mut db_manager);
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query(
"test_db",
"SELECT COUNT(*) AS total FROM activitydetails WHERE date BETWEEN 1746608400 AND 1746702000",
None,
true,
None,
))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
assert_eq!(json_result[0]["total"], 3);
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_date_range_query_daily_bucket() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 1440, "ahmed_test");
db_manager.create_database("test_db").unwrap();
insert_activitydetails(&mut db_manager);
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query(
"test_db",
"SELECT COUNT(*) AS total FROM activitydetails WHERE date BETWEEN 1746566400 AND 1746652799",
None,
true,
None,
))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
assert_eq!(json_result[0]["total"], 2);
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_date_range_query_weekly_bucket() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 10080, "ahmed_test");
db_manager.create_database("test_db").unwrap();
insert_activitydetails(&mut db_manager);
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query(
"test_db",
"SELECT COUNT(*) AS total FROM activitydetails WHERE date BETWEEN 1746566400 AND 1747171199",
None,
true,
None,
))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
assert_eq!(json_result[0]["total"], 4);
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_date_range_query_monthly_bucket() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "ahmed_test");
db_manager.create_database("test_db").unwrap();
insert_activitydetails(&mut db_manager);
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query(
"test_db",
"SELECT COUNT(*) AS total FROM activitydetails WHERE date BETWEEN 1746038400 AND 1748716799",
None,
true,
None,
))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
assert_eq!(json_result[0]["total"], 4);
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_database_manager_error_handling() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let result = std::panic::catch_unwind(|| {
DatabaseManager::new("", 30, "test_user");
});
assert!(result.is_ok() || result.is_err());
let result = std::panic::catch_unwind(|| {
DatabaseManager::new(&db_root, u32::MAX, "test_user");
});
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_create_database_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let result = db_manager.create_database("");
assert!(result.is_ok() || result.is_err());
let result = db_manager.create_database("db@test");
assert!(result.is_ok() || result.is_err());
let result1 = db_manager.create_database("test_db");
assert!(result1.is_ok());
let result2 = db_manager.create_database("test_db");
assert!(result2.is_ok() || result2.is_err());
}
#[test]
fn test_create_table_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("test_db");
let result = db_manager.create_table("test_db", "test_table", "invalid json");
assert!(result.is_err());
let invalid_schema = r#"{"field1": {"required": true}}"#;
let result = db_manager.create_table("test_db", "test_table", invalid_schema);
assert!(result.is_err());
let invalid_schema2 = r#"{"field1": {"type": "string", "required": "not_boolean"}}"#;
let result = db_manager.create_table("test_db", "test_table", invalid_schema2);
assert!(result.is_err());
let valid_schema = r#"{"field1": {"type": "string", "required": true}}"#;
let result = db_manager.create_table("nonexistent_db", "test_table", valid_schema);
assert!(result.is_err());
let result1 = db_manager.create_table("test_db", "test_table", valid_schema);
assert!(result1.is_ok());
let result2 = db_manager.create_table("test_db", "test_table", valid_schema);
assert!(result2.is_err());
}
#[test]
fn test_insert_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("test_db");
let schema = r#"{"id": {"type": "int", "required": true}, "datetime": {"type": "string", "required": true}}"#;
let _ = db_manager.create_table("test_db", "test_table", schema);
let result = db_manager.insert("test_db", "test_table", "invalid json");
assert!(result.is_err());
let data = r#"[{"id": 1}]"#;
let result = db_manager.insert("test_db", "test_table", data);
assert!(result.is_err());
let schema_with_datetime = r#"{"id": {"type": "int"}, "date": {"type": "string", "datetime": true, "required": true}}"#;
let _ = db_manager.create_table("test_db", "test_table2", schema_with_datetime);
let data_no_datetime = r#"[{"id": 1}]"#;
let result = db_manager.insert("test_db", "test_table2", data_no_datetime);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Missing required datetime field"));
let data2 = r#"[{"id": 1, "datetime": "invalid_date"}]"#;
let result = db_manager.insert("test_db", "test_table", data2);
assert!(result.is_err());
let data3 = r#"[{"id": "not_a_number", "datetime": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "test_table", data3);
assert!(result.is_err());
let data4 = r#"[{"id": 1, "datetime": "2023.01.01 12:00:00", "extra_field": "value"}]"#;
let result = db_manager.insert("test_db", "test_table", data4);
assert!(result.is_err());
let data5 = r#"[{"id": 1, "datetime": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("nonexistent_db", "test_table", data5);
assert!(result.is_err());
let result = db_manager.insert("test_db", "nonexistent_table", data5);
assert!(result.is_err());
}
#[test]
fn test_list_operations_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let result = db_manager.list_tables("nonexistent_db");
assert!(result.is_err());
let metadata_path = format!("{}/metadata.json", db_root);
let _ = std::fs::write(&metadata_path, "invalid json content");
let result = db_manager.list_databases();
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_delete_operations_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let result = db_manager.delete_database("nonexistent_db");
assert!(result.is_err());
let _ = db_manager.create_database("test_db");
let result = db_manager.delete_table("test_db", "nonexistent_table");
assert!(result.is_err());
let result = db_manager.delete_table("nonexistent_db", "test_table");
assert!(result.is_err());
}
#[tokio::test]
async fn test_query_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let result = db_manager.query("nonexistent_db", "SELECT * FROM table", None, true, None).await;
assert!(result.is_err());
let result = db_manager.query("test_db", "INVALID SQL QUERY", None, true, None).await;
assert!(result.is_err());
let result = db_manager.query("test_db", "", None, true, None).await;
assert!(result.is_err());
}
#[test]
fn test_metadata_operations_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let metadata_path = format!("{}/metadata.json", db_root);
let _ = std::fs::write(&metadata_path, "invalid json content");
let result = db_manager.list_databases();
assert!(result.is_ok() || result.is_err());
let _ = std::fs::write(&metadata_path, "");
let result = db_manager.list_databases();
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_schema_validation_edge_cases() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("test_db");
let complex_schema = r#"{
"id": {"type": "int", "required": true},
"metadata": {"type": "object", "required": false},
"tags": {"type": "array", "required": false},
"datetime": {"type": "string", "required": true}
}"#;
let result = db_manager.create_table("test_db", "complex_table", complex_schema);
assert!(result.is_ok() || result.is_err());
let union_schema = r#"{
"value": {"type": "int|string", "required": true},
"datetime": {"type": "string", "required": true}
}"#;
let result = db_manager.create_table("test_db", "union_table", union_schema);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_file_system_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let read_only_dir = format!("{}/readonly", db_root);
let _ = std::fs::create_dir(&read_only_dir);
let _ = std::fs::set_permissions(&read_only_dir, std::fs::Permissions::from_mode(0o444));
let result = db_manager.create_database("test_db");
assert!(result.is_ok() || result.is_err());
let _ = std::fs::set_permissions(&read_only_dir, std::fs::Permissions::from_mode(0o755));
}
#[test]
fn test_lock_acquisition_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let _db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let handles: Vec<_> = (0..5)
.map(|i| {
std::thread::spawn({
let db_root = db_root.clone();
move || {
let mut db_manager = DatabaseManager::new(&db_root, 30, &format!("user_{}", i));
db_manager.update_metadata(&db_root)
}
})
})
.collect();
for handle in handles {
let result = handle.join();
assert!(result.is_ok()); }
}
#[tokio::test]
async fn test_complex_query_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let result = db_manager
.query("test_db", "SELECT * FROM table1 JOIN table2 ON table1.id = table2.id", None, true, None)
.await;
assert!(result.is_err());
let result = db_manager
.query("test_db", "SELECT * FROM (SELECT * FROM table1) AS subquery", None, true, None)
.await;
assert!(result.is_err());
}
#[test]
fn test_parquet_file_operations_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let result = db_manager.build_files_list("test_db", "test_table", None);
assert!(result.is_err());
let invalid_file = format!("{}/invalid.parquet", db_root);
let _ = std::fs::write(&invalid_file, "not a parquet file");
let result = db_manager.build_files_list("test_db", "test_table", None);
assert!(result.is_err());
}
#[test]
fn test_build_files_list_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let result = db_manager.build_files_list("nonexistent_db", "test_table", None);
assert!(result.is_err());
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("test_db");
let result = db_manager.build_files_list("test_db", "nonexistent_table", None);
assert!(result.is_err());
let schema = r#"{"id": {"type": "int", "required": true}}"#;
let _ = db_manager.create_table("test_db", "test_table", schema);
let table_path = format!("{}/data/test_db/test_table", db_root);
let _ = std::fs::remove_dir_all(&table_path);
let result = db_manager.build_files_list("test_db", "test_table", None);
assert!(result.is_err());
}
#[test]
fn test_data_validation_edge_cases() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("test_db");
let schema = r#"{
"id": {"type": "int", "required": true},
"name": {"type": "string", "required": false},
"value": {"type": "float", "required": false},
"active": {"type": "bool", "required": false},
"datetime": {"type": "string", "required": true}
}"#;
let _ = db_manager.create_table("test_db", "validation_table", schema);
let data_with_null = r#"[{"id": 1, "name": null, "datetime": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "validation_table", data_with_null);
assert!(result.is_ok() || result.is_err());
let data_with_empty = r#"[{"id": 1, "name": "", "datetime": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "validation_table", data_with_empty);
assert!(result.is_ok() || result.is_err());
let data_with_zero = r#"[{"id": 0, "value": 0.0, "datetime": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "validation_table", data_with_zero);
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_concurrent_operations() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
for i in 0..5 {
let result = db_manager.create_database(&format!("db_{}", i));
assert!(result.is_ok());
}
let _ = db_manager.create_database("concurrent_db");
let schema = r#"{"id": {"type": "int", "required": true}, "datetime": {"type": "string", "required": true}}"#;
for i in 0..3 {
let result = db_manager.create_table("concurrent_db", &format!("table_{}", i), schema);
assert!(result.is_ok());
}
}
#[test]
fn test_memory_pressure_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("large_db");
let schema =
r#"{"id": {"type": "int", "required": true}, "data": {"type": "string", "required": false}, "datetime": {"type": "string", "required": true}}"#;
let _ = db_manager.create_table("large_db", "large_table", schema);
for i in 0..50 {
let large_data = format!(r#"[{{"id": {}, "data": "{}", "datetime": "2023.01.01 12:00:00"}}]"#, i, "x".repeat(100)); let result = db_manager.insert("large_db", "large_table", &large_data);
assert!(result.is_ok() || result.is_err()); }
let result = db_manager.list_databases();
assert!(result.is_ok() || result.is_err());
let result = db_manager.list_tables("large_db");
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_edge_case_initialization() {
let temp_dir = TempDir::new().unwrap();
let long_path = format!("{}/{}", temp_dir.path().display(), "a".repeat(1000));
let _ = std::fs::create_dir_all(&long_path);
let result = std::panic::catch_unwind(|| {
DatabaseManager::new(&long_path, 30, "test_user");
});
assert!(result.is_ok() || result.is_err());
let special_path = format!("{}/{}", temp_dir.path().display(), "path@with#special$chars");
let _ = std::fs::create_dir_all(&special_path);
let result = std::panic::catch_unwind(|| {
DatabaseManager::new(&special_path, 30, "test_user");
});
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_metadata_corruption_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("test_db");
let schema = r#"{"id": {"type": "int", "required": true}, "datetime": {"type": "string", "required": true}}"#;
let _ = db_manager.create_table("test_db", "test_table", schema);
let metadata_path = format!("{}/metadata.json", db_root);
let _ = std::fs::write(
&metadata_path,
"{\"databases\": {\"test_db\": {\"tables\": {\"test_table\": {\"path\": \"invalid/path\"}}}}}",
);
let result = db_manager.list_databases();
assert!(result.is_ok() || result.is_err());
let result = db_manager.list_tables("test_db");
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_file_permission_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("test_db");
let schema = r#"{"id": {"type": "int", "required": true}, "datetime": {"type": "string", "required": true}}"#;
let _ = db_manager.create_table("test_db", "test_table", schema);
let data_dir = format!("{}/data", db_root);
let _ = std::fs::set_permissions(&data_dir, std::fs::Permissions::from_mode(0o444));
let result = db_manager.create_database("new_db");
assert!(result.is_ok() || result.is_err());
let _ = std::fs::set_permissions(&data_dir, std::fs::Permissions::from_mode(0o755));
}
#[test]
fn test_large_scale_operations() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
for i in 0..10 {
let db_name = format!("db_{}", i);
let _ = db_manager.create_database(&db_name);
for j in 0..5 {
let table_name = format!("table_{}", j);
let schema = r#"{"id": {"type": "int", "required": true}, "datetime": {"type": "string", "required": true}}"#;
let _ = db_manager.create_table(&db_name, &table_name, schema);
}
}
let result = db_manager.list_databases();
assert!(result.is_ok());
let databases = result.unwrap();
assert!(databases.len() >= 10);
for db_name in databases {
let result = db_manager.list_tables(&db_name);
assert!(result.is_ok());
let tables = result.unwrap();
assert_eq!(tables.len(), 5);
}
}
#[test]
fn test_error_recovery_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("test_db");
let schema = r#"{"id": {"type": "int", "required": true}, "datetime": {"type": "string", "required": true}}"#;
let _ = db_manager.create_table("test_db", "test_table", schema);
let data = r#"[{"id": 1, "datetime": "2023.01.01 12:00:00"}]"#;
let _ = db_manager.insert("test_db", "test_table", data);
let metadata_path = format!("{}/metadata.json", db_root);
let _ = std::fs::write(&metadata_path, "{}");
let result = db_manager.list_databases();
assert!(result.is_ok() || result.is_err());
let _ = db_manager.create_database("test_db");
let _ = db_manager.create_table("test_db", "test_table", schema);
let result = db_manager.list_tables("test_db");
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn test_complex_query_scenarios_advanced() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("complex_db");
let schema1 = r#"{"fields": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "datetime", "type": "string"}]}"#;
let schema2 = r#"{"fields": [{"name": "user_id", "type": "int"}, {"name": "score", "type": "float"}, {"name": "datetime", "type": "string"}]}"#;
let _ = db_manager.create_table("complex_db", "users", schema1);
let _ = db_manager.create_table("complex_db", "scores", schema2);
let user_data = r#"[{"id": 1, "name": "Alice", "datetime": "2023.01.01 12:00:00"}, {"id": 2, "name": "Bob", "datetime": "2023.01.01 13:00:00"}]"#;
let score_data =
r#"[{"user_id": 1, "score": 95.5, "datetime": "2023.01.01 12:00:00"}, {"user_id": 2, "score": 87.2, "datetime": "2023.01.01 13:00:00"}]"#;
let _ = db_manager.insert("complex_db", "users", user_data);
let _ = db_manager.insert("complex_db", "scores", score_data);
let complex_queries = vec![
"SELECT u.name, s.score FROM users u JOIN scores s ON u.id = s.user_id",
"SELECT COUNT(*) FROM users WHERE datetime >= '2023.01.01 12:00:00'",
"SELECT AVG(score) FROM scores GROUP BY user_id",
"SELECT * FROM users ORDER BY id DESC LIMIT 1",
];
for sql_query in complex_queries {
let result = db_manager.query("complex_db", sql_query, None, true, None).await;
assert!(result.is_ok() || result.is_err());
}
}
#[tokio::test]
async fn test_join_query_error_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let invalid_join_queries = vec![
"SELECT * FROM table1 JOIN table2 ON table1.id = table2.id",
"SELECT * FROM users JOIN nonexistent ON users.id = nonexistent.id",
"SELECT * FROM table1 JOIN table2 JOIN table3 ON table1.id = table2.id",
"SELECT * FROM users JOIN scores ON users.id = scores.user_id WHERE users.id > 100",
];
for sql_query in invalid_join_queries {
let result = db_manager.query("test_db", sql_query, None, true, None).await;
assert!(result.is_err());
}
}
#[tokio::test]
async fn test_query_with_time_range_filtering() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("time_db");
let schema = r#"{"fields": [{"name": "id", "type": "int"}, {"name": "value", "type": "float"}, {"name": "datetime", "type": "string"}]}"#;
let _ = db_manager.create_table("time_db", "time_data", schema);
let data = r#"[{"id": 1, "value": 10.5, "datetime": "2023.01.01 12:00:00"}, {"id": 2, "value": 20.0, "datetime": "2023.01.01 13:00:00"}, {"id": 3, "value": 30.0, "datetime": "2023.01.01 14:00:00"}]"#;
let _ = db_manager.insert("time_db", "time_data", data);
let time_range_queries = vec![
"SELECT * FROM time_data WHERE datetime >= '2023.01.01 12:00:00' AND datetime <= '2023.01.01 13:00:00'",
"SELECT COUNT(*) FROM time_data WHERE datetime > '2023.01.01 12:30:00'",
"SELECT AVG(value) FROM time_data WHERE datetime BETWEEN '2023.01.01 12:00:00' AND '2023.01.01 14:00:00'",
];
for sql_query in time_range_queries {
let result = db_manager.query("time_db", sql_query, None, true, None).await;
assert!(result.is_ok() || result.is_err());
}
}
#[tokio::test]
async fn test_query_with_empty_results() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let empty_result_queries = vec![
"SELECT * FROM nonexistent_table",
"SELECT * FROM test_table WHERE id > 1000",
"SELECT * FROM test_table WHERE datetime > '2024.01.01 00:00:00'",
];
for sql_query in empty_result_queries {
let result = db_manager.query("test_db", sql_query, None, true, None).await;
assert!(result.is_err());
}
}
#[tokio::test]
async fn test_dataframe_output_scenarios() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("df_db");
let schema = r#"{"fields": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "value", "type": "float"}]}"#;
let _ = db_manager.create_table("df_db", "df_table", schema);
let data = r#"[{"id": 1, "name": "test1", "value": 10.5}, {"id": 2, "name": "test2", "value": 20.0}]"#;
let _ = db_manager.insert("df_db", "df_table", data);
let result = db_manager.query("df_db", "SELECT * FROM df_table", None, false, None).await;
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn test_query_with_username_filtering() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("user_db");
let schema = r#"{"fields": [{"name": "id", "type": "int"}, {"name": "data", "type": "string"}, {"name": "datetime", "type": "string"}]}"#;
let _ = db_manager.create_table("user_db", "user_data", schema);
let data =
r#"[{"id": 1, "data": "user1_data", "datetime": "2023.01.01 12:00:00"}, {"id": 2, "data": "user2_data", "datetime": "2023.01.01 13:00:00"}]"#;
let _ = db_manager.insert("user_db", "user_data", data);
let result = db_manager
.query("user_db", "SELECT * FROM user_data", Some("test_user"), true, None)
.await;
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_insert_with_duplicate_records() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("duplicate_db");
let schema = r#"{"fields": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "datetime", "type": "string"}]}"#;
let _ = db_manager.create_table("duplicate_db", "duplicate_table", schema);
let data1 = r#"[{"id": 1, "name": "Alice", "datetime": "2023.01.01 12:00:00"}]"#;
let data2 = r#"[{"id": 1, "name": "Alice", "datetime": "2023.01.01 12:00:00"}]"#; let data3 = r#"[{"id": 2, "name": "Bob", "datetime": "2023.01.01 13:00:00"}]"#;
let _ = db_manager.insert("duplicate_db", "duplicate_table", data1);
let _ = db_manager.insert("duplicate_db", "duplicate_table", data2);
let _ = db_manager.insert("duplicate_db", "duplicate_table", data3);
}
#[test]
fn test_insert_with_partitioning_edge_cases() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("partition_db");
let schema = r#"{"fields": [{"name": "id", "type": "int"}, {"name": "data", "type": "string"}, {"name": "datetime", "type": "string"}]}"#;
let _ = db_manager.create_table("partition_db", "partition_table", schema);
let edge_case_data = vec![
r#"[{"id": 1, "data": "edge1", "datetime": "2023.01.01 00:00:00"}]"#, r#"[{"id": 2, "data": "edge2", "datetime": "2023.01.01 23:59:59"}]"#, r#"[{"id": 3, "data": "edge3", "datetime": "2023.12.31 12:00:00"}]"#, r#"[{"id": 4, "data": "edge4", "datetime": "2023.01.01 12:30:00"}]"#, ];
for data in edge_case_data {
let result = db_manager.insert("partition_db", "partition_table", data);
assert!(result.is_ok() || result.is_err());
}
}
#[test]
fn test_schema_validation_comprehensive() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("schema_db");
let schemas = vec![
r#"{"fields": [{"name": "id", "type": "int"}, {"name": "nested", "type": "object", "properties": {"key": {"type": "string"}}}]}"#,
r#"{"fields": [{"name": "data", "type": "array"}, {"name": "tags", "type": "array"}]}"#,
r#"{"fields": [{"name": "metadata", "type": "object"}]}"#,
r#"{"fields": [{"name": "complex", "type": "object", "properties": {"nested": {"type": "object", "properties": {"deep": {"type": "string"}}}}}]}"#,
];
for (i, schema) in schemas.iter().enumerate() {
let table_name = format!("complex_table_{}", i);
let result = db_manager.create_table("schema_db", &table_name, schema);
assert!(result.is_ok() || result.is_err());
}
}
#[test]
fn test_data_validation_comprehensive() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("validation_db");
let schema = r#"{"fields": [{"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "value", "type": "float"}, {"name": "active", "type": "bool"}, {"name": "datetime", "type": "string"}]}"#;
let _ = db_manager.create_table("validation_db", "validation_table", schema);
let test_data = vec![
r#"[{"id": 1, "name": "test", "value": 10.5, "active": true, "datetime": "2023.01.01 12:00:00"}]"#,
r#"[{"id": 2, "name": null, "value": null, "active": false, "datetime": "2023.01.01 13:00:00"}]"#,
r#"[{"id": 3, "name": "special@chars", "value": 0.0, "active": true, "datetime": "2023.01.01 14:00:00"}]"#,
r#"[{"id": 4, "name": "unicode_测试", "value": -10.5, "active": false, "datetime": "2023.01.01 15:00:00"}]"#,
];
for data in test_data {
let result = db_manager.insert("validation_db", "validation_table", data);
assert!(result.is_ok() || result.is_err());
}
}
#[test]
fn test_metadata_operations_comprehensive() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("metadata_db");
let schema = r#"{"fields": [{"name": "id", "type": "int"}, {"name": "data", "type": "string"}]}"#;
let _ = db_manager.create_table("metadata_db", "metadata_table", schema);
}
#[test]
fn test_file_operations_comprehensive() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("file_db");
let schema = r#"{"fields": [{"name": "id", "type": "int"}, {"name": "data", "type": "string"}, {"name": "datetime", "type": "string"}]}"#;
let _ = db_manager.create_table("file_db", "file_table", schema);
let data = r#"[{"id": 1, "data": "file_data", "datetime": "2023.01.01 12:00:00"}]"#;
let _ = db_manager.insert("file_db", "file_table", data);
let files_list = db_manager.build_files_list("file_db", "file_table", None);
assert!(files_list.is_ok() || files_list.is_err());
let files_list_with_user = db_manager.build_files_list("file_db", "file_table", Some("test_user"));
assert!(files_list_with_user.is_ok() || files_list_with_user.is_err());
}
#[test]
fn test_error_handling_comprehensive() {
let temp_dir = TempDir::new().unwrap();
let db_root = temp_dir.path().to_str().unwrap().to_string();
let mut db_manager = DatabaseManager::new(&db_root, 30, "test_user");
let _ = db_manager.create_database("error_db");
let schema = r#"{"fields": [{"name": "id", "type": "int"}]}"#;
let _ = db_manager.create_table("error_db", "error_table", schema);
let invalid_data = vec![
r#"[{"id": 1, "name": "test"#, r#"[{"id": 1, name: "test"}]"#, r#"[{"id": 1, "name": "test",}]"#, r#"[{"id": 1, "name": "test", "extra": }]"#, r#"[{"id": "invalid", "name": "test"}]"#, ];
for data in invalid_data {
let result = db_manager.insert("error_db", "error_table", data);
assert!(result.is_ok() || result.is_err());
}
}
#[test]
fn test_create_table_nonexistent_database() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
let schema = r#"{"id": {"type": "int"}}"#;
let result = db_manager.create_table("nonexistent_db", "test_table", schema);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("does not exist") || err_msg.contains("Database"));
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_database_error_paths() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
let result = db_manager.delete_database("nonexistent_db");
assert!(result.is_err());
db_manager.create_database("test_db").unwrap();
let result = db_manager.delete_database("test_db");
assert!(result.is_ok());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_table_error_paths() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let _ = db_manager.delete_table("test_db", "nonexistent_table");
let result = db_manager.delete_table("test_db", "test_table");
assert!(result.is_ok());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_insert_datetime_format_parsing() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "string", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let formats = vec![
r#"[{"id": 1, "date": "2023.01.01 12:00:00"}]"#, r#"[{"id": 2, "date": "2023-01-01T12:00:00.000Z"}]"#, r#"[{"id": 3, "date": "2023-01-01T12:00:00Z"}]"#, r#"[{"id": 4, "date": "2023-01-01 12:00:00"}]"#, ];
for data in formats {
let result = db_manager.insert("test_db", "test_table", data);
let _ = result;
}
let invalid_data = r#"[{"id": 5, "date": "invalid-date"}]"#;
let result = db_manager.insert("test_db", "test_table", invalid_data);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Invalid datetime") || err_msg.contains("datetime"));
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_insert_validation_rules() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"value": {"type": "int", "min": 0, "max": 100}, "date": {"type": "string", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let valid_data = r#"[{"value": 50, "date": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "test_table", valid_data);
let _ = result;
let invalid_data = r#"[{"value": 150, "date": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "test_table", invalid_data);
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_insert_existing_records_update() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema =
r#"{"id": {"type": "int", "unique": true}, "name": {"type": "string"}, "date": {"type": "string", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data1 = r#"[{"id": 1, "name": "Alice", "date": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "test_table", data1);
let _ = result;
let data2 = r#"[{"id": 1, "name": "Bob", "date": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "test_table", data2);
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_list_databases_metadata_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let result = db_manager.list_databases();
assert!(result.is_ok());
let databases = result.unwrap();
assert!(databases.contains(&"test_db".to_string()));
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_metadata_write_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
if Path::new(&metadata_path).exists() {
fs::remove_file(&metadata_path).unwrap();
}
fs::set_permissions(&temp_dir, fs::Permissions::from_mode(0o555)).unwrap();
let _db_manager = DatabaseManager::new(storage_path, 30, "test_user");
fs::set_permissions(&temp_dir, fs::Permissions::from_mode(0o755)).unwrap();
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_create_database_metadata_save_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
fs::set_permissions(storage_path, fs::Permissions::from_mode(0o555)).unwrap();
let result = db_manager.create_database("test_db");
assert!(result.is_err());
fs::set_permissions(storage_path, fs::Permissions::from_mode(0o755)).unwrap();
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_create_table_database_not_exist() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
let schema = r#"{"id": {"type": "int"}}"#;
let result = db_manager.create_table("nonexistent_db", "test_table", schema);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("does not exist"));
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_list_databases_read_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
let metadata_path = format!("{}/metadata.json", storage_path);
fs::remove_file(&metadata_path).unwrap();
fs::set_permissions(&temp_dir, fs::Permissions::from_mode(0o555)).unwrap();
let result = db_manager.list_databases();
assert!(result.is_err());
fs::set_permissions(&temp_dir, fs::Permissions::from_mode(0o755)).unwrap();
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_database_metadata_reload_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
fs::write(&metadata_path, "invalid json").unwrap();
let result = db_manager.delete_database("test_db");
assert!(result.is_err());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_database_remove_dir_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let db_path = format!("{}/data/test_db", storage_path);
fs::write(format!("{}/test_file", db_path), "test").unwrap();
fs::set_permissions(&db_path, fs::Permissions::from_mode(0o555)).unwrap();
let result = db_manager.delete_database("test_db");
let _ = result;
fs::set_permissions(&db_path, fs::Permissions::from_mode(0o755)).unwrap();
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_table_metadata_reload_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
fs::write(&metadata_path, "invalid json").unwrap();
let result = db_manager.delete_table("test_db", "test_table");
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("metadata") || error_msg.contains("Failed to reload"));
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_table_remove_dir_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let table_path = format!("{}/data/test_db/test_table", storage_path);
fs::write(format!("{}/test_file", table_path), "test").unwrap();
fs::set_permissions(&table_path, fs::Permissions::from_mode(0o555)).unwrap();
let result = db_manager.delete_table("test_db", "test_table");
let _ = result;
fs::set_permissions(&table_path, fs::Permissions::from_mode(0o755)).unwrap();
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_insert_read_parquet_and_update() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int", "unique": true}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data1 = r#"[{"id": 1, "date": "2023.01.01 12:00:00"}]"#;
db_manager.insert("test_db", "test_table", data1).unwrap();
let data2 = r#"[{"id": 1, "date": "2023.01.01 13:00:00"}]"#;
db_manager.insert("test_db", "test_table", data2).unwrap();
cleanup_temp_dir(temp_dir);
}
#[tokio::test]
async fn test_query_partition_limits() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "date": "2023.01.01 12:00:00"}, {"id": 2, "date": "2023.01.02 12:00:00"}]"#;
db_manager.insert("test_db", "test_table", data).unwrap();
let result = db_manager.query("test_db", "SELECT * FROM test_table", None, true, Some(1)).await;
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[tokio::test]
async fn test_query_dataframe_output() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "date": "2023.01.01 12:00:00"}]"#;
db_manager.insert("test_db", "test_table", data).unwrap();
let result = db_manager.query("test_db", "SELECT * FROM test_table", None, false, None).await;
assert!(result.is_ok());
cleanup_temp_dir(temp_dir);
}
#[tokio::test]
async fn test_register_table_already_registered() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "date": "2023.01.01 12:00:00"}]"#;
db_manager.insert("test_db", "test_table", data).unwrap();
let result1 = db_manager.query("test_db", "SELECT * FROM test_table", None, true, None).await;
assert!(result1.is_ok());
let result2 = db_manager.query("test_db", "SELECT * FROM test_table", None, true, None).await;
assert!(result2.is_ok());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_resolve_table_dir_errors() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
let result = db_manager.build_files_list("nonexistent_db", "test_table", None);
assert!(result.is_err());
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let result = db_manager.build_files_list("test_db", "test_table", Some("test_user"));
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_build_files_list_errors() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let table_path = format!("{}/data/test_db/test_table", storage_path);
fs::remove_dir_all(&table_path).unwrap();
let result = db_manager.build_files_list("test_db", "test_table", None);
assert!(result.is_err());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_collect_files_recursive() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "date": "2023.01.01 12:00:00"}]"#;
db_manager.insert("test_db", "test_table", data).unwrap();
let result = db_manager.build_files_list("test_db", "test_table", None);
assert!(result.is_ok());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_validate_unexpected_field() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "string", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "date": "2023.01.01 12:00:00", "extra": "field"}]"#;
let result = db_manager.insert("test_db", "test_table", data);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Unexpected field"));
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_validate_field_types() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "name": {"type": "string"}, "value": {"type": "float"}, "active": {"type": "bool"}, "tags": {"type": "array"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "name": "test", "value": 1.5, "active": true, "tags": [1, 2], "date": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "test_table", data);
assert!(result.is_ok());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_read_parquet_file_errors() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "date": "2023.01.01 12:00:00"}]"#;
db_manager.insert("test_db", "test_table", data).unwrap();
let files = db_manager.build_files_list("test_db", "test_table", None).unwrap();
assert!(!files.is_empty());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_get_metadata_cached_miss() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
fs::write(&metadata_path, r#"{"databases": {}}"#).unwrap();
let result = db_manager.list_databases();
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_update_metadata_lock_retry() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
let lock_path = format!("{}/metadata.lock", storage_path);
let _lock_file = fs::File::create(&lock_path).unwrap();
let result = db_manager.update_metadata(storage_path);
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_create_table_metadata_reload_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
fs::write(&metadata_path, "invalid json").unwrap();
let schema = r#"{"id": {"type": "int"}}"#;
let result = db_manager.create_table("test_db", "test_table", schema);
assert!(result.is_err());
cleanup_temp_dir(temp_dir);
}
#[tokio::test]
async fn test_query_partition_limits_with_where() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "date": "2023.01.01 12:00:00"}, {"id": 2, "date": "2023.01.02 12:00:00"}]"#;
db_manager.insert("test_db", "test_table", data).unwrap();
let result = db_manager
.query("test_db", "SELECT * FROM test_table WHERE id > 0", None, true, Some(1))
.await;
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[tokio::test]
async fn test_query_partition_limits_empty_partitions() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let result = db_manager.query("test_db", "SELECT * FROM test_table", None, true, Some(1)).await;
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[tokio::test]
async fn test_register_table_double_check() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "date": "2023.01.01 12:00:00"}]"#;
db_manager.insert("test_db", "test_table", data).unwrap();
let handles: Vec<_> = (0..3)
.map(|_| {
let storage_path = storage_path.to_string();
std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let db_manager = DatabaseManager::new(&storage_path, 30, "test_user");
db_manager.query("test_db", "SELECT * FROM test_table", None, true, None).await
})
})
})
.collect();
for handle in handles {
let _ = handle.join();
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_resolve_table_dir_group_path() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let group_path = format!("{}/data/group/test_user/test_db/test_table", storage_path);
fs::create_dir_all(&group_path).unwrap();
let result = db_manager.build_files_list("test_db", "test_table", Some("test_user"));
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_build_files_list_metadata_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let db_manager = DatabaseManager::new(storage_path, 30, "test_user");
let metadata_path = format!("{}/metadata.json", storage_path);
fs::write(&metadata_path, "invalid json").unwrap();
let result = db_manager.build_files_list("test_db", "test_table", None);
assert!(result.is_err());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_build_files_list_base_dir_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
let metadata = r#"{"databases": {"test_db": {"tables": {"test_table": {"path": "invalid/path", "schema": {}}}}}}"#;
fs::write(&metadata_path, metadata).unwrap();
let result = db_manager.build_files_list("test_db", "test_table", None);
assert!(result.is_err());
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_validate_unknown_type() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "data": {"type": "object"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "data": {"key": "value"}, "date": "2023.01.01 12:00:00"}]"#;
let result = db_manager.insert("test_db", "test_table", data);
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_read_parquet_file_record_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}, "date": {"type": "int", "datetime": true, "required": true}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let data = r#"[{"id": 1, "date": "2023.01.01 12:00:00"}]"#;
db_manager.insert("test_db", "test_table", data).unwrap();
let files = db_manager.build_files_list("test_db", "test_table", None).unwrap();
for _file in files {
let _ = db_manager.build_files_list("test_db", "test_table", None);
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_get_metadata_cached_none_path() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let _ = db_manager.list_databases();
let metadata_path = format!("{}/metadata.json", storage_path);
fs::write(&metadata_path, r#"{"databases": {}}"#).unwrap();
let _ = db_manager.list_databases();
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_update_metadata_path_updates() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 30, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = r#"{"id": {"type": "int"}}"#;
db_manager.create_table("test_db", "test_table", schema).unwrap();
let new_storage = format!("{}/new_storage", temp_dir.to_str().unwrap());
fs::create_dir_all(&new_storage).unwrap();
let result = db_manager.update_metadata(&new_storage);
let _ = result;
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_atomic_file_insert_basic() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let data1 = json!([
{"date": "2025.12.01 10:00:00", "value": 100},
{"date": "2025.12.01 10:01:00", "value": 200}
]);
assert!(db_manager.insert("test_db", "test_table", &data1.to_string()).is_ok());
let data2 = json!([
{"date": "2025.12.01 10:00:00", "value": 150} ]);
assert!(db_manager.insert("test_db", "test_table", &data2.to_string()).is_ok());
let data3 = json!([
{"date": "2025.12.01 10:02:00", "value": 300}
]);
assert!(db_manager.insert("test_db", "test_table", &data3.to_string()).is_ok());
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table ORDER BY date", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
let records = json_result.as_array().unwrap();
assert_eq!(records.len(), 3, "Should have 3 records total");
assert_eq!(records[0]["value"], 150, "First record should be updated to 150");
assert_eq!(records[1]["value"], 200, "Second record should remain 200");
assert_eq!(records[2]["value"], 300, "Third record should be 300");
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_concurrent_inserts_same_partition() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true},
"thread_id": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let num_threads = 10;
let records_per_thread = 50;
let base_timestamp = chrono::Utc::now().timestamp();
let handles = (0..num_threads)
.map(|thread_id| {
let storage_path = storage_path.to_string();
thread::spawn(move || {
let mut db_manager = DatabaseManager::new(&storage_path, 43200, "test_user");
let mut records = Vec::new();
for i in 0..records_per_thread {
let offset_seconds = (thread_id * records_per_thread + i) as i64;
let record_timestamp = base_timestamp + offset_seconds;
let record_date = chrono::DateTime::<chrono::Utc>::from_timestamp(record_timestamp, 0)
.unwrap()
.format("%Y.%m.%d %H:%M:%S")
.to_string();
records.push(json!({
"date": record_date,
"value": thread_id * 1000 + i,
"thread_id": thread_id
}));
}
let json_data = serde_json::to_string(&records).unwrap();
match db_manager.insert("test_db", "test_table", &json_data) {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
})
})
.collect::<Vec<_>>();
let mut errors = Vec::new();
for handle in handles {
match handle.join() {
Ok(Ok(_)) => {}
Ok(Err(e)) => errors.push(e.to_string()),
Err(e) => errors.push(format!("Thread panicked: {:?}", e)),
}
}
assert!(errors.is_empty(), "No errors should occur: {:?}", errors);
thread::sleep(Duration::from_millis(300));
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
let records = json_result.as_array().unwrap();
let expected_count = num_threads * records_per_thread;
assert_eq!(
records.len(),
expected_count,
"Should have {} records, got {}",
expected_count,
records.len()
);
let mut seen_dates = std::collections::HashSet::new();
for record in records {
let date = record["date"].as_i64().unwrap();
assert!(seen_dates.insert(date), "Duplicate date found: {}", date);
}
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_concurrent_updates_same_record() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true},
"updated_by": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let fixed_date = "2025.12.01 10:00:00";
let initial_data = json!([
{"date": fixed_date, "value": 0, "updated_by": 0}
]);
assert!(db_manager.insert("test_db", "test_table", &initial_data.to_string()).is_ok());
thread::sleep(Duration::from_millis(100));
let num_threads = 10;
let handles = (0..num_threads)
.map(|thread_id| {
let storage_path = storage_path.to_string();
let fixed_date = fixed_date.to_string();
thread::spawn(move || {
let mut db_manager = DatabaseManager::new(&storage_path, 43200, "test_user");
let data = json!([
{"date": fixed_date, "value": thread_id + 1, "updated_by": thread_id}
]);
let json_data = serde_json::to_string(&data).unwrap();
match db_manager.insert("test_db", "test_table", &json_data) {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
})
})
.collect::<Vec<_>>();
for handle in handles {
assert!(handle.join().unwrap().is_ok());
}
thread::sleep(Duration::from_millis(300));
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
let records = json_result.as_array().unwrap();
assert_eq!(records.len(), 1, "Should have exactly 1 record, got {}", records.len());
let record = &records[0];
let value = record["value"].as_i64().unwrap();
let updated_by = record["updated_by"].as_i64().unwrap();
assert!(
value >= 1 && value <= num_threads as i64,
"Value should be between 1 and {}, got {}",
num_threads,
value
);
assert!(
updated_by >= 0 && updated_by < num_threads as i64,
"updated_by should be between 0 and {}, got {}",
num_threads - 1,
updated_by
);
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_mixed_insert_update_concurrent() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true},
"operation": {"type": "string", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let initial_date1 = "2025.12.01 10:00:00";
let initial_date2 = "2025.12.01 10:01:00";
let initial_data = json!([
{"date": initial_date1, "value": 100, "operation": "initial"},
{"date": initial_date2, "value": 200, "operation": "initial"}
]);
assert!(db_manager.insert("test_db", "test_table", &initial_data.to_string()).is_ok());
thread::sleep(Duration::from_millis(100));
let base_timestamp = chrono::Utc::now().timestamp();
let num_insert_threads = 5;
let num_update_threads = 5;
let insert_handles = (0..num_insert_threads)
.map(|thread_id| {
let storage_path = storage_path.to_string();
thread::spawn(move || {
let mut db_manager = DatabaseManager::new(&storage_path, 43200, "test_user");
let record_date = chrono::DateTime::<chrono::Utc>::from_timestamp(base_timestamp + 100 + thread_id as i64, 0)
.unwrap()
.format("%Y.%m.%d %H:%M:%S")
.to_string();
let data = json!([
{"date": record_date, "value": thread_id + 1000, "operation": "insert"}
]);
let json_data = serde_json::to_string(&data).unwrap();
match db_manager.insert("test_db", "test_table", &json_data) {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
})
})
.collect::<Vec<_>>();
let update_handles = (0..num_update_threads)
.map(|thread_id| {
let storage_path = storage_path.to_string();
let update_date = if thread_id % 2 == 0 { initial_date1 } else { initial_date2 };
thread::spawn(move || {
let mut db_manager = DatabaseManager::new(&storage_path, 43200, "test_user");
let data = json!([
{"date": update_date, "value": thread_id + 2000, "operation": "update"}
]);
let json_data = serde_json::to_string(&data).unwrap();
match db_manager.insert("test_db", "test_table", &json_data) {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
})
})
.collect::<Vec<_>>();
for handle in insert_handles.into_iter().chain(update_handles) {
assert!(handle.join().unwrap().is_ok());
}
thread::sleep(Duration::from_millis(300));
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
let records = json_result.as_array().unwrap();
assert_eq!(
records.len(),
7,
"Should have 7 records (2 initial + 5 new inserts), got {}",
records.len()
);
let mut seen_dates = HashSet::new();
for record in records {
let date = record["date"].as_i64().unwrap();
assert!(seen_dates.insert(date), "Duplicate date found: {}", date);
}
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_metadata_atomic_write() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"id": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
thread::sleep(Duration::from_millis(100));
let num_threads = 10;
let storage_path = storage_path.to_string();
let handles = (0..num_threads)
.map(|_| {
let storage_path = storage_path.clone();
thread::spawn(move || {
let mut db_manager = DatabaseManager::new(&storage_path, 43200, "test_user");
db_manager.list_tables("test_db")
})
})
.collect::<Vec<_>>();
for handle in handles {
let result = handle.join().unwrap();
assert!(result.is_ok(), "Thread should see metadata: {:?}", result);
let tables = result.unwrap();
assert!(tables.contains(&"test_table".to_string()), "Should see test_table");
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_metadata_read_retry() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let storage_path = storage_path.to_string();
let handles = (0..5)
.map(|i| {
let storage_path = storage_path.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(i * 10)); let mut db_manager = DatabaseManager::new(&storage_path, 43200, "test_user");
db_manager.list_databases()
})
})
.collect::<Vec<_>>();
for i in 0..5 {
let schema = json!({
"id": {"type": "int", "required": true}
});
let result = db_manager.create_table("test_db", &format!("table_{}", i), &schema.to_string());
if result.is_err() {
}
thread::sleep(Duration::from_millis(5));
}
for handle in handles {
let result = handle.join().unwrap();
assert!(result.is_ok(), "Thread should read metadata successfully");
let databases = result.unwrap();
assert!(databases.contains(&"test_db".to_string()));
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_partition_isolation() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true},
"partition": {"type": "string", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
thread::sleep(Duration::from_millis(100));
let num_partitions = 5;
let records_per_partition = 20;
let base_timestamp = chrono::Utc::now().timestamp();
let handles = (0..num_partitions)
.map(|partition_id| {
let storage_path = storage_path.to_string();
thread::spawn(move || {
let mut db_manager = DatabaseManager::new(&storage_path, 43200, "test_user");
let mut records = Vec::new();
let base_dt = chrono::DateTime::<chrono::Utc>::from_timestamp(base_timestamp, 0).unwrap();
let base_year = base_dt.year();
let base_month = base_dt.month();
for i in 0..records_per_partition {
let target_month = ((base_month as i32 + partition_id as i32 - 1) % 12) + 1;
let target_year = base_year + ((base_month as u32 + partition_id as u32 - 1) / 12) as i32;
let record_date = format!(
"{:04}.{:02}.{:02} {:02}:00:00",
target_year,
target_month,
1 + (i % 28) as u32, 10 + (i % 12) as u32
);
records.push(json!({
"date": record_date,
"value": partition_id * 1000 + i,
"partition": format!("partition_{}", partition_id)
}));
}
let json_data = serde_json::to_string(&records).unwrap();
match db_manager.insert("test_db", "test_table", &json_data) {
Ok(_) => Ok(()),
Err(e) => Err(format!("{}", e)),
}
})
})
.collect::<Vec<_>>();
for handle in handles {
assert!(handle.join().unwrap().is_ok());
}
thread::sleep(Duration::from_millis(300));
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
let records = json_result.as_array().unwrap();
let expected_count = num_partitions * records_per_partition;
assert_eq!(
records.len(),
expected_count,
"Should have {} records, got {}",
expected_count,
records.len()
);
let mut partition_counts = HashMap::new();
for record in records {
let partition = record["partition"].as_str().unwrap();
*partition_counts.entry(partition.to_string()).or_insert(0) += 1;
}
for partition_id in 0..num_partitions {
let partition_name = format!("partition_{}", partition_id);
assert_eq!(
partition_counts.get(&partition_name),
Some(&records_per_partition),
"Partition {} should have {} records",
partition_name,
records_per_partition
);
}
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_file_write_atomicity() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let num_inserts = 10;
for i in 0..num_inserts {
let data = json!([
{"date": format!("2025.12.01 {:02}:00:00", 10 + i), "value": i}
]);
assert!(db_manager.insert("test_db", "test_table", &data.to_string()).is_ok());
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
let records = json_result.as_array().unwrap();
assert_eq!(
records.len(),
i + 1,
"After {} inserts, should have {} records, got {}",
i + 1,
i + 1,
records.len()
);
for record in records {
assert!(record["date"].is_number(), "Date should be a number");
assert!(record["value"].is_number(), "Value should be a number");
}
}
_ => panic!("Expected JSON output"),
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_mutex_locking_prevents_race_conditions() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true},
"thread_id": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let num_threads = 20;
let records_per_thread = 10;
let base_timestamp = chrono::Utc::now().timestamp();
let success_count = Arc::new(Mutex::new(0));
let handles = (0..num_threads)
.map(|thread_id| {
let storage_path = storage_path.to_string();
let success_count = Arc::clone(&success_count);
thread::spawn(move || {
let mut db_manager = DatabaseManager::new(&storage_path, 43200, "test_user");
let mut records = Vec::new();
for i in 0..records_per_thread {
let offset_seconds = (thread_id * records_per_thread + i) as i64;
let record_timestamp = base_timestamp + offset_seconds;
let record_date = chrono::DateTime::<chrono::Utc>::from_timestamp(record_timestamp, 0)
.unwrap()
.format("%Y.%m.%d %H:%M:%S")
.to_string();
records.push(json!({
"date": record_date,
"value": thread_id * 1000 + i,
"thread_id": thread_id
}));
}
let json_data = serde_json::to_string(&records).unwrap();
match db_manager.insert("test_db", "test_table", &json_data) {
Ok(_) => {
let mut count = success_count.lock().unwrap();
*count += 1;
}
Err(_) => {}
}
})
})
.collect::<Vec<_>>();
for handle in handles {
handle.join().unwrap();
}
thread::sleep(Duration::from_millis(500));
let final_count = *success_count.lock().unwrap();
assert_eq!(
final_count, num_threads,
"All {} threads should succeed, but only {} succeeded",
num_threads, final_count
);
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
let records = json_result.as_array().unwrap();
let expected_count = num_threads * records_per_thread;
assert_eq!(
records.len(),
expected_count,
"Should have {} records, got {}",
expected_count,
records.len()
);
let mut seen_dates = HashSet::new();
for record in records {
let date = record["date"].as_i64().unwrap();
assert!(seen_dates.insert(date), "Duplicate date found: {}", date);
}
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_atomic_operation_error_handling() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let valid_data = json!([
{"date": "2025.12.01 10:00:00", "value": 50}
]);
assert!(db_manager.insert("test_db", "test_table", &valid_data.to_string()).is_ok());
thread::sleep(Duration::from_millis(100));
let invalid_data = json!([
{"date": "invalid_date", "value": 100} ]);
let result = db_manager.insert("test_db", "test_table", &invalid_data.to_string());
assert!(result.is_err(), "Should fail with invalid date format");
thread::sleep(Duration::from_millis(100));
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
let records = json_result.as_array().unwrap();
assert_eq!(records.len(), 1, "Should have 1 record (the one inserted before the error)");
}
_ => panic!("Expected JSON output"),
}
let valid_data2 = json!([
{"date": "2025.12.01 10:01:00", "value": 100}
]);
assert!(db_manager.insert("test_db", "test_table", &valid_data2.to_string()).is_ok());
thread::sleep(Duration::from_millis(100));
let rt = Runtime::new().unwrap();
let result = rt
.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None))
.unwrap();
match result {
DataFusionOutput::Json(json_result) => {
let records = json_result.as_array().unwrap();
assert_eq!(records.len(), 2, "Should have 2 records after valid insert");
}
_ => panic!("Expected JSON output"),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_cleanup_unused_locks_functionality() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
for i in 0..100 {
let timestamp = chrono::Utc::now().timestamp() + i;
let date = chrono::DateTime::<chrono::Utc>::from_timestamp(timestamp, 0)
.unwrap()
.format("%Y.%m.%d %H:%M:%S")
.to_string();
let data = json!([{"date": date, "value": i}]);
let _ = db_manager.insert("test_db", "test_table", &data.to_string());
}
thread::sleep(Duration::from_millis(100));
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_datafusion_output_debug_with_empty_dataframe() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let data = json!([{"date": "2025.12.01 10:00:00", "value": 1}]);
db_manager.insert("test_db", "test_table", &data.to_string()).unwrap();
let rt = Runtime::new().unwrap();
let result = rt.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, false, None));
match result {
Ok(output) => {
let debug_str = format!("{:?}", output);
assert!(debug_str.len() > 0, "Debug output should not be empty");
println!("Debug output length: {}", debug_str.len());
}
Err(e) => {
panic!("Query should succeed: {:?}", e);
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_metadata_serialization_error_handling() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
fs::write(&metadata_path, "{ invalid json }").unwrap();
let result = db_manager.list_databases();
match result {
Ok(_) => {
println!("System recovered from corrupted metadata");
}
Err(e) => {
println!("Expected error for corrupted metadata: {:?}", e);
assert!(e.to_string().contains("metadata") || e.to_string().contains("parse"));
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_metadata_read_error_paths() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
let metadata_file = fs::File::open(&metadata_path).unwrap();
let mut permissions = metadata_file.metadata().unwrap().permissions();
permissions.set_mode(0o444); fs::set_permissions(&metadata_path, permissions.clone()).unwrap();
let result = db_manager.create_database("test_db2");
let mut permissions = fs::metadata(&metadata_path).unwrap().permissions();
permissions.set_mode(0o644);
fs::set_permissions(&metadata_path, permissions).unwrap();
match result {
Ok(_) => {
println!("Operation succeeded despite read-only metadata (system-dependent)");
}
Err(e) => {
println!("Expected error for read-only metadata: {:?}", e);
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_cleanup_orphaned_temp_files_coverage() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let data_path = format!("{}/data", storage_path);
fs::create_dir_all(&data_path).unwrap();
let old_temp_file = format!("{}/old_data.parquet.tmp", data_path);
fs::write(&old_temp_file, "fake parquet data").unwrap();
let nested_dir = format!("{}/nested", data_path);
fs::create_dir_all(&nested_dir).unwrap();
let nested_temp = format!("{}/nested_data.parquet.tmp", nested_dir);
fs::write(&nested_temp, "fake nested parquet data").unwrap();
let _db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_cleanup_orphaned_temp_files_with_read_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let data_path = format!("{}/data", storage_path);
fs::create_dir_all(&data_path).unwrap();
let unreadable_dir = format!("{}/unreadable", data_path);
fs::create_dir_all(&unreadable_dir).unwrap();
let mut permissions = fs::metadata(&unreadable_dir).unwrap().permissions();
permissions.set_mode(0o000); fs::set_permissions(&unreadable_dir, permissions.clone()).unwrap();
let _db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
let mut permissions = fs::metadata(&unreadable_dir).unwrap().permissions();
permissions.set_mode(0o755);
fs::set_permissions(&unreadable_dir, permissions).unwrap();
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_cleanup_orphaned_temp_files_metadata_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let data_path = format!("{}/data", storage_path);
fs::create_dir_all(&data_path).unwrap();
let temp_file = format!("{}/test.parquet.tmp", data_path);
fs::write(&temp_file, "data").unwrap();
let _db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_partition_directory_creation_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let table_path = format!("{}/data/test_db/test_table", storage_path);
let mut permissions = fs::metadata(&table_path).unwrap().permissions();
permissions.set_mode(0o444); fs::set_permissions(&table_path, permissions.clone()).unwrap();
let data = json!([{"date": "2025.12.01 10:00:00", "value": 1}]);
let result = db_manager.insert("test_db", "test_table", &data.to_string());
let mut permissions = fs::metadata(&table_path).unwrap().permissions();
permissions.set_mode(0o755);
fs::set_permissions(&table_path, permissions).unwrap();
match result {
Ok(_) => {
println!("Insert succeeded despite read-only directory (system-dependent)");
}
Err(e) => {
println!("Expected error for partition creation: {:?}", e);
assert!(e.to_string().contains("partition") || e.to_string().contains("Failed to create"));
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_query_with_metadata_read_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
fs::remove_file(&metadata_path).unwrap();
let rt = Runtime::new().unwrap();
let result = rt.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None));
match result {
Ok(_) => {
println!("Query succeeded with missing metadata (recovered)");
}
Err(e) => {
println!("Expected error for missing metadata: {:?}", e);
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_register_single_table_with_no_parquet_files() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let rt = Runtime::new().unwrap();
let result = rt.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None));
match result {
Ok(_) => {
println!("Query on empty table succeeded");
}
Err(e) => {
println!("Expected error for empty table: {:?}", e);
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_resolve_table_dir_with_group_user() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let group_dir = format!("{}/group/group_user/test_db/test_table", storage_path);
fs::create_dir_all(&group_dir).unwrap();
let rt = Runtime::new().unwrap();
let result = rt.block_on(db_manager.query("test_db", "SELECT * FROM test_table", Some("group_user"), true, None));
match result {
Ok(_) => {
println!("Query succeeded for group user");
}
Err(e) => {
println!("Expected error for group user without data: {:?}", e);
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_resolve_table_dir_error_paths() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let result = db_manager.build_files_list("test_db", "nonexistent_table", None);
assert!(result.is_err(), "Should fail for non-existent table");
let result = db_manager.build_files_list("nonexistent_db", "test_table", None);
assert!(result.is_err(), "Should fail for non-existent database");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_atomic_file_insert_lock_acquisition() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let handles: Vec<_> = (0..10)
.map(|i| {
let storage_path = storage_path.to_string();
thread::spawn(move || {
let mut db_manager = DatabaseManager::new(&storage_path, 43200, "test_user");
let timestamp = chrono::Utc::now().timestamp() + i;
let date = chrono::DateTime::<chrono::Utc>::from_timestamp(timestamp, 0)
.unwrap()
.format("%Y.%m.%d %H:%M:%S")
.to_string();
let data = json!([{"date": date, "value": i}]);
let _ = db_manager.insert("test_db", "test_table", &data.to_string());
})
})
.collect();
for handle in handles {
let _ = handle.join();
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_read_parquet_file_static_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let table_path = format!("{}/data/test_db/test_table", storage_path);
let partition_dir = format!("{}/partition_date=2025-12-01", table_path);
fs::create_dir_all(&partition_dir).unwrap();
let fake_parquet = format!("{}/data.parquet", partition_dir);
fs::write(&fake_parquet, "not a parquet file").unwrap();
let data = json!([{"date": "2025.12.01 10:00:00", "value": 1}]);
let result = db_manager.insert("test_db", "test_table", &data.to_string());
match result {
Ok(_) => {
println!("Insert succeeded (file was overwritten)");
}
Err(e) => {
println!("Expected error for corrupted parquet file: {:?}", e);
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_get_metadata_cached_sync_runtime_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
let result = db_manager.create_database("test_db");
assert!(result.is_ok(), "Should succeed in sync context");
let rt = Runtime::new().unwrap();
rt.block_on(async {
let result = db_manager.create_database("test_db2");
assert!(result.is_ok(), "Should succeed in async context");
});
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_save_metadata_async_retry_logic() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
for i in 0..10 {
let db_name = format!("test_db_{}", i);
let result = db_manager.create_database(&db_name);
match result {
Ok(_) => println!("Created database {}", db_name),
Err(e) => println!("Error creating database {}: {:?}", db_name, e),
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_create_metadata_backup_and_cleanup() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
for i in 0..10 {
let db_name = format!("test_db_{}", i);
db_manager.create_database(&db_name).unwrap();
thread::sleep(Duration::from_millis(10)); }
let backup_dir = format!("{}/metadata.json.backups", storage_path);
if Path::new(&backup_dir).exists() {
let backup_count = fs::read_dir(&backup_dir).unwrap().count();
println!("Created {} backup files", backup_count);
assert!(backup_count <= 5, "Should not exceed MAX_METADATA_BACKUPS (5)");
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_validate_parquet_file_error_paths() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let data = json!([{"date": "2025.12.01 10:00:00", "value": 1}]);
db_manager.insert("test_db", "test_table", &data.to_string()).unwrap();
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_parquet_file_writer_locked_error_paths() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let table_path = format!("{}/data/test_db/test_table", storage_path);
fs::create_dir_all(&table_path).unwrap();
let mut permissions = fs::metadata(&table_path).unwrap().permissions();
permissions.set_mode(0o444);
fs::set_permissions(&table_path, permissions.clone()).unwrap();
let data = json!([{"date": "2025.12.01 10:00:00", "value": 1}]);
let result = db_manager.insert("test_db", "test_table", &data.to_string());
let mut permissions = fs::metadata(&table_path).unwrap().permissions();
permissions.set_mode(0o755);
fs::set_permissions(&table_path, permissions).unwrap();
match result {
Ok(_) => println!("Insert succeeded (system-dependent)"),
Err(e) => {
println!("Expected error for write failure: {:?}", e);
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_update_metadata_multiple_calls() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
for _ in 0..5 {
let result = db_manager.update_metadata(storage_path);
match result {
Ok(_) => println!("update_metadata succeeded"),
Err(e) => println!("update_metadata error: {:?}", e),
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_get_table_path_validation() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
let result = db_manager.get_table_path("../etc", "passwd");
assert!(result.is_none(), "Should reject path traversal in database name");
let result = db_manager.get_table_path("test_db", "../etc/passwd");
assert!(result.is_none(), "Should reject path traversal in table name");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_resolve_table_dir_database_no_tables() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("empty_db").unwrap();
let result = db_manager.build_files_list("empty_db", "nonexistent_table", Some("group_user"));
assert!(result.is_err(), "Should fail for non-existent table in empty database");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_atomic_file_insert_with_empty_records() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let data = json!([{"date": "2025.12.01 10:00:00", "value": 1}]);
let result = db_manager.insert("test_db", "test_table", &data.to_string());
assert!(result.is_ok(), "Insert should succeed");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_validate_parquet_schema_mismatch() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let data = json!([{"date": "2025.12.01 10:00:00", "value": 1}]);
db_manager.insert("test_db", "test_table", &data.to_string()).unwrap();
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_read_metadata_with_retry() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
fs::create_dir_all(storage_path).unwrap();
fs::write(&metadata_path, "").unwrap();
let _db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_get_metadata_cached_invalidation() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db1").unwrap();
let dbs = db_manager.list_databases().unwrap();
assert!(dbs.contains(&"test_db1".to_string()));
db_manager.create_database("test_db2").unwrap();
let dbs = db_manager.list_databases().unwrap();
assert!(dbs.contains(&"test_db2".to_string()));
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_save_metadata_with_backup() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
for i in 0..3 {
let db_name = format!("test_db_{}", i);
db_manager.create_database(&db_name).unwrap();
thread::sleep(Duration::from_millis(50));
}
let metadata_path = format!("{}/metadata.json", storage_path);
assert!(Path::new(&metadata_path).exists(), "Metadata file should exist");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_cleanup_old_backups() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
for i in 0..8 {
let db_name = format!("db_{}", i);
db_manager.create_database(&db_name).unwrap();
thread::sleep(Duration::from_millis(20));
}
let backup_dir = format!("{}/metadata.json.backups", storage_path);
if Path::new(&backup_dir).exists() {
let backup_count = fs::read_dir(&backup_dir).unwrap().count();
println!("Backup count: {}", backup_count);
assert!(backup_count <= 5, "Should not exceed 5 backups");
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_update_metadata_with_lock_file() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let result = db_manager.update_metadata(storage_path);
assert!(result.is_ok(), "update_metadata should succeed");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_create_database_error_handling() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let result = db_manager.create_database("test_db");
match result {
Ok(_) => println!("Database creation is idempotent"),
Err(e) => println!("Expected error for duplicate database: {:?}", e),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_operations_error_handling() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let result = db_manager.delete_table("test_db", "test_table");
assert!(result.is_ok(), "Delete table should succeed");
let result = db_manager.delete_database("test_db");
assert!(result.is_ok(), "Delete database should succeed");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_register_table_with_schema_inference_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let data = json!([{"date": "2025.12.01 10:00:00", "value": 1}]);
db_manager.insert("test_db", "test_table", &data.to_string()).unwrap();
let rt = Runtime::new().unwrap();
let result = rt.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, None));
assert!(result.is_ok(), "Query should succeed");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_register_table_with_group_user_no_data() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let group_dir = format!("{}/group/group_user/test_db/test_table", storage_path);
fs::create_dir_all(&group_dir).unwrap();
let rt = Runtime::new().unwrap();
let result = rt.block_on(db_manager.query("test_db", "SELECT * FROM test_table", Some("group_user"), true, None));
match result {
Ok(_) => println!("Query succeeded (empty result)"),
Err(e) => println!("Expected error for group user without data: {:?}", e),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_parquet_writer_with_sync_errors() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let data = json!([
{"date": "2025.12.01 10:00:00", "value": 1},
{"date": "2025.12.01 10:01:00", "value": 2}
]);
let result = db_manager.insert("test_db", "test_table", &data.to_string());
assert!(result.is_ok(), "Insert should succeed");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_metadata_backup_without_existing_file() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
fs::create_dir_all(storage_path).unwrap();
let _db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_query_with_partition_limit() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
for i in 0..5 {
let timestamp = chrono::Utc::now().timestamp() + (i * 86400); let date = chrono::DateTime::<chrono::Utc>::from_timestamp(timestamp, 0)
.unwrap()
.format("%Y.%m.%d %H:%M:%S")
.to_string();
let data = json!([{"date": date, "value": i}]);
let _ = db_manager.insert("test_db", "test_table", &data.to_string());
}
let rt = Runtime::new().unwrap();
let result = rt.block_on(db_manager.query("test_db", "SELECT * FROM test_table", None, true, Some(2)));
match result {
Ok(output) => match output {
DataFusionOutput::Json(json_result) => {
println!("Query with partition limit returned {} records", json_result.as_array().unwrap().len());
}
_ => {}
},
Err(e) => println!("Query error: {:?}", e),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_create_database_with_metadata_save_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
let parent_dir = Path::new(&metadata_path).parent().unwrap();
let mut permissions = fs::metadata(parent_dir).unwrap().permissions();
permissions.set_mode(0o444);
fs::set_permissions(parent_dir, permissions.clone()).unwrap();
let result = db_manager.create_database("test_db2");
let mut permissions = fs::metadata(parent_dir).unwrap().permissions();
permissions.set_mode(0o755);
fs::set_permissions(parent_dir, permissions).unwrap();
match result {
Ok(_) => println!("Create succeeded (system-dependent)"),
Err(e) => println!("Expected error for metadata save: {:?}", e),
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_create_table_with_nonexistent_database() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
let result = db_manager.create_table("nonexistent_db", "test_table", &schema.to_string());
assert!(result.is_err(), "Should fail for non-existent database");
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("does not exist"), "Error should mention database doesn't exist");
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_database_with_metadata_save_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
let mut permissions = fs::metadata(&metadata_path).unwrap().permissions();
permissions.set_mode(0o444);
fs::set_permissions(&metadata_path, permissions.clone()).unwrap();
let result = db_manager.delete_database("test_db");
let mut permissions = fs::metadata(&metadata_path).unwrap().permissions();
permissions.set_mode(0o644);
fs::set_permissions(&metadata_path, permissions).unwrap();
match result {
Ok(_) => println!("Delete succeeded (system-dependent)"),
Err(e) => {
println!("Expected error for metadata save: {:?}", e);
assert!(e.to_string().contains("metadata") || e.to_string().contains("save"));
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_delete_table_with_metadata_save_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let metadata_path = format!("{}/metadata.json", storage_path);
let mut permissions = fs::metadata(&metadata_path).unwrap().permissions();
permissions.set_mode(0o444);
fs::set_permissions(&metadata_path, permissions.clone()).unwrap();
let result = db_manager.delete_table("test_db", "test_table");
let mut permissions = fs::metadata(&metadata_path).unwrap().permissions();
permissions.set_mode(0o644);
fs::set_permissions(&metadata_path, permissions).unwrap();
match result {
Ok(_) => println!("Delete succeeded (system-dependent)"),
Err(e) => {
println!("Expected error for metadata save: {:?}", e);
assert!(e.to_string().contains("metadata") || e.to_string().contains("save"));
}
}
cleanup_temp_dir(temp_dir);
}
#[test]
fn test_insert_with_atomic_file_error() {
let temp_dir = create_temp_dir();
let storage_path = temp_dir.to_str().unwrap();
let mut db_manager = DatabaseManager::new(storage_path, 43200, "test_user");
db_manager.create_database("test_db").unwrap();
let schema = json!({
"date": {"type": "int", "required": true, "unique": true, "datetime": true},
"value": {"type": "int", "required": true}
});
db_manager.create_table("test_db", "test_table", &schema.to_string()).unwrap();
let table_path = format!("{}/data/test_db/test_table", storage_path);
let mut permissions = fs::metadata(&table_path).unwrap().permissions();
permissions.set_mode(0o444);
fs::set_permissions(&table_path, permissions.clone()).unwrap();
let data = json!([{"date": "2025.12.01 10:00:00", "value": 1}]);
let result = db_manager.insert("test_db", "test_table", &data.to_string());
let mut permissions = fs::metadata(&table_path).unwrap().permissions();
permissions.set_mode(0o755);
fs::set_permissions(&table_path, permissions).unwrap();
assert!(result.is_err(), "Insert should fail with read-only directory");
let error_msg = result.unwrap_err().to_string();
println!("Error message: {}", error_msg);
cleanup_temp_dir(temp_dir);
}