use crate::timon_engine::cloud_sync::{DatabaseManagerInterface, MockS3Store, S3StoreInterface};
use crate::timon_engine::{cloud_sync::CloudStorageManager, db_manager::DatabaseManager};
use chrono::Utc;
use serde_json::json;
use std::collections::HashMap;
use std::io::Write;
use tempfile::NamedTempFile;
struct MockDatabaseManager {
username: String,
pub storage_path: String,
files: Vec<String>,
schema: serde_json::Value,
}
impl MockDatabaseManager {
fn new() -> Self {
MockDatabaseManager {
username: "testuser".to_string(),
storage_path: "tmp/timon_test".to_string(),
files: vec!["tmp/timon_test/data/test_db/test_table/test_table_2023-01_01.parquet".to_string()],
schema: json!({
"id": {"type": "int", "unique": true},
"timestamp": {"type": "int", "datetime": true},
"value": {"type": "float"}
}),
}
}
fn with_files(files: Vec<String>) -> Self {
MockDatabaseManager {
username: "testuser".to_string(),
storage_path: "tmp/timon_test".to_string(),
files,
schema: json!({
"id": {"type": "int", "unique": true},
"timestamp": {"type": "int", "datetime": true},
"value": {"type": "float"}
}),
}
}
fn with_schema(schema: serde_json::Value) -> Self {
MockDatabaseManager {
username: "testuser".to_string(),
storage_path: "tmp/timon_test".to_string(),
files: vec!["tmp/timon_test/data/test_db/test_table/test_table_2023-01_01.parquet".to_string()],
schema,
}
}
}
impl DatabaseManagerInterface for MockDatabaseManager {
fn build_files_list(&self, _db_name: &str, _table_name: &str, _username: Option<&str>) -> Result<Vec<String>, Box<dyn std::error::Error>> {
Ok(self.files.clone())
}
fn get_table_schema(&self, _db_name: &str, _table_name: &str) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
Ok(self.schema.clone())
}
fn get_username(&self) -> &str {
&self.username
}
fn get_storage_path(&self) -> &str {
&self.storage_path
}
}
impl MockS3Store {
fn new() -> Self {
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01_01.parquet".to_string(),
vec![1, 2, 3, 4], );
let mut modified_times = HashMap::new();
modified_times.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01_01.parquet".to_string(),
Utc::now(),
);
MockS3Store { cloud_files, modified_times }
}
fn with_future_timestamps() -> Self {
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01_01.parquet".to_string(),
vec![1, 2, 3, 4],
);
let mut modified_times = HashMap::new();
let future_time = Utc::now() + chrono::Duration::hours(24);
modified_times.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01_01.parquet".to_string(),
future_time,
);
MockS3Store { cloud_files, modified_times }
}
fn with_past_timestamps() -> Self {
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01_01.parquet".to_string(),
vec![1, 2, 3, 4],
);
let mut modified_times = HashMap::new();
let past_time = Utc::now() - chrono::Duration::hours(24);
modified_times.insert("testuser/test_db/test_table/2023/01/test_table_2023-01_01.parquet".to_string(), past_time);
MockS3Store { cloud_files, modified_times }
}
fn empty() -> Self {
MockS3Store {
cloud_files: HashMap::new(),
modified_times: HashMap::new(),
}
}
}
fn setup_test_environment() -> CloudStorageManager<MockS3Store> {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let storage_path = format!("tmp/timon_test_{}", timestamp);
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group", storage_path);
let merge_path = format!("{}/merge_workspace", storage_path);
let _ = std::fs::create_dir_all(&data_path);
let _ = std::fs::create_dir_all(&group_path);
let _ = std::fs::create_dir_all(&merge_path);
let db_path = format!("{}/test_db", data_path);
let _ = std::fs::create_dir_all(&db_path);
let table_path = format!("{}/test_table", db_path);
let _ = std::fs::create_dir_all(&table_path);
let db_manager = MockDatabaseManager {
username: "testuser".to_string(),
storage_path: storage_path.clone(),
files: vec![format!("{}/test_table_2023-01_01.parquet", table_path)],
schema: json!({
"id": {"type": "int", "unique": true},
"timestamp": {"type": "int", "datetime": true},
"value": {"type": "float"}
}),
};
let test_file = format!("{}/test_table_2023-01_01.parquet", table_path);
let _ = std::fs::write(&test_file, vec![1, 2, 3, 4]);
let mock_s3 = MockS3Store::new();
CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"))
}
fn setup_test_environment_with_files(files: Vec<String>) -> CloudStorageManager<MockS3Store> {
let storage_path = "tmp/timon_test";
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group", storage_path);
let merge_path = format!("{}/merge_workspace", storage_path);
let _ = std::fs::create_dir_all(&data_path);
let _ = std::fs::create_dir_all(&group_path);
let _ = std::fs::create_dir_all(&merge_path);
let db_path = format!("{}/test_db", data_path);
let _ = std::fs::create_dir_all(&db_path);
let table_path = format!("{}/test_table", db_path);
let _ = std::fs::create_dir_all(&table_path);
let db_manager = MockDatabaseManager::with_files(files.clone());
for file in &files {
let _ = std::fs::write(file, vec![1, 2, 3, 4]);
}
let mock_s3 = MockS3Store::new();
CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"))
}
fn cleanup_test_environment() {
if let Ok(entries) = std::fs::read_dir("tmp") {
for entry in entries.flatten() {
if let Some(name) = entry.file_name().to_str() {
if name.starts_with("timon_test_") {
let _ = std::fs::remove_dir_all(entry.path());
}
}
}
}
}
#[tokio::test]
async fn test_new_cloud_storage_manager() {
let db_manager = DatabaseManager::new("tmp/tests", 30, "ahmed_test"); let mock_s3 = MockS3Store::new();
let manager = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
assert_eq!(manager.bucket_name, "test-bucket");
}
#[tokio::test]
async fn test_new() {
let db_manager = MockDatabaseManager::new();
let mock_s3 = MockS3Store::new();
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
assert_eq!(cloud_mgr.bucket_name, "test-bucket");
}
#[tokio::test]
async fn test_cloud_sync_parquet() {
let cloud_mgr = setup_test_environment();
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_sync_parquet("test_db", "test_table", &date_range, None).await;
cleanup_test_environment();
let _ = result;
}
#[tokio::test]
async fn test_cloud_sink_parquet() {
let cloud_mgr = setup_test_environment();
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
cleanup_test_environment();
assert!(result.is_ok(), "cloud_sink_parquet failed: {:?}", result.err());
}
#[tokio::test]
async fn test_cloud_fetch_parquet() {
let cloud_mgr = setup_test_environment();
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
cleanup_test_environment();
let _ = result;
}
#[tokio::test]
async fn test_upload_to_bucket() {
let cloud_mgr = setup_test_environment();
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "test content").unwrap();
let test_file_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.upload_to_bucket(test_file_path, "testuser/test_upload.txt").await;
cleanup_test_environment();
assert!(result.is_ok(), "upload_to_bucket failed: {:?}", result.err());
}
#[tokio::test]
async fn test_download_from_bucket() {
let cloud_mgr = setup_test_environment();
let temp_file = NamedTempFile::new().unwrap();
let download_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr
.download_from_bucket("testuser/test_db/test_table/2023/01/test_table_2023-01_01.parquet", download_path)
.await;
cleanup_test_environment();
assert!(result.is_ok(), "download_from_bucket failed: {:?}", result.err());
}
#[tokio::test]
async fn test_list_cloud_files() {
let cloud_mgr = setup_test_environment();
let result = cloud_mgr.list_cloud_files("testuser/test_db/test_table").await;
cleanup_test_environment();
assert!(result.is_ok(), "list_cloud_files failed: {:?}", result.err());
let files = result.unwrap();
assert!(!files.is_empty(), "No files were returned");
}
#[tokio::test]
async fn test_cloud_sink_with_future_timestamps() {
let storage_path = "tmp/timon_test";
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group", storage_path);
let merge_path = format!("{}/merge_workspace", storage_path);
let _ = std::fs::create_dir_all(&data_path);
let _ = std::fs::create_dir_all(&group_path);
let _ = std::fs::create_dir_all(&merge_path);
let db_path = format!("{}/test_db", data_path);
let _ = std::fs::create_dir_all(&db_path);
let table_path = format!("{}/test_table", db_path);
let _ = std::fs::create_dir_all(&table_path);
let db_manager = MockDatabaseManager::new();
let mock_s3 = MockS3Store::with_future_timestamps();
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let test_file = format!("{}/test_table_2023-01_01.parquet", table_path);
let _ = std::fs::write(&test_file, vec![1, 2, 3, 4]);
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
cleanup_test_environment();
assert!(result.is_ok(), "cloud_sink_parquet with future timestamps failed: {:?}", result.err());
}
#[tokio::test]
async fn test_cloud_sink_with_past_timestamps() {
let storage_path = "tmp/timon_test";
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group", storage_path);
let merge_path = format!("{}/merge_workspace", storage_path);
let _ = std::fs::create_dir_all(&data_path);
let _ = std::fs::create_dir_all(&group_path);
let _ = std::fs::create_dir_all(&merge_path);
let db_path = format!("{}/test_db", data_path);
let _ = std::fs::create_dir_all(&db_path);
let table_path = format!("{}/test_table", db_path);
let _ = std::fs::create_dir_all(&table_path);
let db_manager = MockDatabaseManager::new();
let mock_s3 = MockS3Store::with_past_timestamps();
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let test_file = format!("{}/test_table_2023-01_01.parquet", table_path);
let _ = std::fs::write(&test_file, vec![1, 2, 3, 4]);
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
cleanup_test_environment();
assert!(result.is_ok(), "cloud_sink_parquet with past timestamps failed: {:?}", result.err());
}
#[tokio::test]
async fn test_cloud_sink_with_empty_files() {
let db_manager = MockDatabaseManager::with_files(vec![]);
let mock_s3 = MockS3Store::empty();
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
assert!(result.is_err(), "Expected error when no files exist");
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("No data files found"), "Expected specific error message");
}
#[tokio::test]
async fn test_cloud_sync_with_invalid_date_range() {
let cloud_mgr = setup_test_environment();
let mut date_range = HashMap::new();
date_range.insert("start_date", "invalid-date");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_sync_parquet("test_db", "test_table", &date_range, None).await;
cleanup_test_environment();
assert!(result.is_ok() || result.is_err(), "Should handle invalid date range");
}
#[tokio::test]
async fn test_cloud_fetch_with_empty_cloud() {
let db_manager = MockDatabaseManager::new();
let mock_s3 = MockS3Store::empty();
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
assert!(result.is_ok(), "Should handle empty cloud storage gracefully");
}
#[tokio::test]
async fn test_upload_with_nonexistent_file() {
let cloud_mgr = setup_test_environment();
let result = cloud_mgr.upload_to_bucket("nonexistent_file.txt", "testuser/nonexistent.txt").await;
cleanup_test_environment();
assert!(result.is_err(), "Expected error when uploading nonexistent file");
}
#[tokio::test]
async fn test_download_with_nonexistent_cloud_file() {
let cloud_mgr = setup_test_environment();
let temp_file = NamedTempFile::new().unwrap();
let download_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.download_from_bucket("nonexistent_cloud_file.parquet", download_path).await;
cleanup_test_environment();
assert!(result.is_ok() || result.is_err(), "Should handle nonexistent cloud file gracefully");
}
#[tokio::test]
async fn test_list_cloud_files_with_empty_prefix() {
let cloud_mgr = setup_test_environment();
let result = cloud_mgr.list_cloud_files("").await;
cleanup_test_environment();
assert!(result.is_ok(), "Should handle empty prefix gracefully");
}
#[tokio::test]
async fn test_cloud_sink_with_multiple_files() {
let files = vec![
"tmp/timon_test/data/test_db/test_table/test_table_2023-01_01.parquet".to_string(),
"tmp/timon_test/data/test_db/test_table/test_table_2023-01_02.parquet".to_string(),
"tmp/timon_test/data/test_db/test_table/test_table_2023-01_03.parquet".to_string(),
];
let cloud_mgr = setup_test_environment_with_files(files);
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
cleanup_test_environment();
assert!(result.is_ok(), "cloud_sink_parquet with multiple files failed: {:?}", result.err());
}
#[tokio::test]
async fn test_cloud_sync_with_different_username() {
let cloud_mgr = setup_test_environment();
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr
.cloud_sync_parquet("test_db", "test_table", &date_range, Some("different_user"))
.await;
cleanup_test_environment();
let _ = result;
}
#[tokio::test]
async fn test_error_handling_in_process_sink_parquet_file() {
let storage_path = "tmp/timon_test";
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group", storage_path);
let merge_path = format!("{}/merge_workspace", storage_path);
let _ = std::fs::create_dir_all(&data_path);
let _ = std::fs::create_dir_all(&group_path);
let _ = std::fs::create_dir_all(&merge_path);
let db_path = format!("{}/test_db", data_path);
let _ = std::fs::create_dir_all(&db_path);
let table_path = format!("{}/test_table", db_path);
let _ = std::fs::create_dir_all(&table_path);
let db_manager = MockDatabaseManager::new();
let mock_s3 = MockS3Store::new();
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let invalid_file = format!("{}/invalid_filename.txt", table_path);
let _ = std::fs::write(&invalid_file, vec![1, 2, 3, 4]);
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
cleanup_test_environment();
assert!(result.is_ok(), "Should handle invalid filename gracefully");
}
#[tokio::test]
async fn test_cloud_storage_manager_with_various_configs() {
let configs = vec![
("https://s3.amazonaws.com", "test-bucket", "access-key", "secret-key", "us-east-1"),
("https://s3.us-west-2.amazonaws.com", "my-bucket", "key1", "secret1", "us-west-2"),
("https://s3.eu-west-1.amazonaws.com", "eu-bucket", "key2", "secret2", "eu-west-1"),
];
for (_endpoint, bucket, _access_key, _secret_key, _region) in configs {
let db_manager = MockDatabaseManager::new();
let mock_s3 = MockS3Store::new();
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some(bucket));
assert_eq!(cloud_mgr.bucket_name, bucket);
}
}
#[tokio::test]
async fn test_cloud_sync_with_various_date_ranges() {
let cloud_mgr = setup_test_environment();
let date_ranges = vec![
HashMap::from([("start_date", "2023-01-01"), ("end_date", "2023-01-31")]),
HashMap::from([("start_date", "2023-02-01"), ("end_date", "2023-02-28")]),
HashMap::from([("start_date", "2023-12-01"), ("end_date", "2023-12-31")]),
];
for date_range in date_ranges {
let result = cloud_mgr.cloud_sync_parquet("test_db", "test_table", &date_range, None).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_cloud_sink_with_various_scenarios() {
let scenarios = vec![
setup_test_environment(),
setup_test_environment_with_files(vec![
"tmp/timon_test/data/test_db/test_table/test_table_2023-01_01.parquet".to_string(),
"tmp/timon_test/data/test_db/test_table/test_table_2023-01_02.parquet".to_string(),
]),
];
for cloud_mgr in scenarios {
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_cloud_fetch_with_various_scenarios() {
let cloud_mgr = setup_test_environment();
let scenarios = vec![
HashMap::from([("start_date", "2023-01-01"), ("end_date", "2023-01-31")]),
HashMap::from([("start_date", "2023-02-01"), ("end_date", "2023-02-28")]),
HashMap::new(), ];
for date_range in scenarios {
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_upload_with_various_files() {
let cloud_mgr = setup_test_environment();
for i in 0..3 {
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "test content {}", i).unwrap();
let file_path = temp_file.path().to_str().unwrap();
let cloud_path = format!("testuser/test_upload_{}.txt", i);
let result = cloud_mgr.upload_to_bucket(file_path, &cloud_path).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_download_with_various_paths() {
let cloud_mgr = setup_test_environment();
let download_paths = vec![
"testuser/test_db/test_table/2023/01/test_table_2023-01_01.parquet",
"testuser/test_db/test_table/2023/02/test_table_2023-02_01.parquet",
"nonexistent_file.parquet",
];
for cloud_path in download_paths {
let temp_file = NamedTempFile::new().unwrap();
let download_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.download_from_bucket(cloud_path, download_path).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_list_cloud_files_with_various_prefixes() {
let cloud_mgr = setup_test_environment();
let prefixes = vec!["testuser/test_db/test_table", "testuser/test_db", "testuser", "", "nonexistent/prefix"];
for prefix in prefixes {
let result = cloud_mgr.list_cloud_files(prefix).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_error_handling_in_cloud_operations() {
let db_manager = MockDatabaseManager::new();
let mock_s3 = MockS3Store::empty();
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
assert!(result.is_ok() || result.is_err());
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_sync_parquet("test_db", "test_table", &date_range, None).await;
assert!(result.is_ok() || result.is_err());
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn test_concurrent_cloud_operations() {
let cloud_mgr = setup_test_environment();
for i in 0..3 {
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "concurrent content {}", i).unwrap();
let file_path = temp_file.path().to_str().unwrap();
let cloud_path = format!("testuser/concurrent_upload_{}.txt", i);
let result = cloud_mgr.upload_to_bucket(file_path, &cloud_path).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_large_file_handling() {
let cloud_mgr = setup_test_environment();
let mut large_file = NamedTempFile::new().unwrap();
let large_content = "x".repeat(10000); writeln!(large_file, "{}", large_content).unwrap();
let file_path = large_file.path().to_str().unwrap();
let cloud_path = "testuser/large_file.txt";
let result = cloud_mgr.upload_to_bucket(file_path, cloud_path).await;
assert!(result.is_ok() || result.is_err());
cleanup_test_environment();
}
#[tokio::test]
async fn test_special_character_handling() {
let cloud_mgr = setup_test_environment();
let special_files = vec![
"testuser/file_with_spaces.txt",
"testuser/file_with_unicode_测试.txt",
"testuser/file-with-dashes.txt",
"testuser/file_with_underscores.txt",
];
for cloud_path in special_files {
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "special content").unwrap();
let file_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.upload_to_bucket(file_path, cloud_path).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_memory_efficient_operations() {
let cloud_mgr = setup_test_environment();
for i in 0..100 {
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "small content {}", i).unwrap();
let file_path = temp_file.path().to_str().unwrap();
let cloud_path = format!("testuser/small_file_{}.txt", i);
let result = cloud_mgr.upload_to_bucket(file_path, &cloud_path).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_network_error_simulation() {
let db_manager = MockDatabaseManager::new();
let mock_s3 = MockS3Store::empty(); let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "test content").unwrap();
let file_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.upload_to_bucket(file_path, "testuser/network_test.txt").await;
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn test_retry_mechanism() {
let cloud_mgr = setup_test_environment();
for attempt in 0..3 {
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "retry attempt {}", attempt).unwrap();
let file_path = temp_file.path().to_str().unwrap();
let cloud_path = format!("testuser/retry_test_{}.txt", attempt);
let result = cloud_mgr.upload_to_bucket(file_path, &cloud_path).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_batch_operations() {
let cloud_mgr = setup_test_environment();
for i in 0..10 {
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "batch content {}", i).unwrap();
let file_path = temp_file.path().to_str().unwrap();
let cloud_path = format!("testuser/batch_file_{}.txt", i);
let result = cloud_mgr.upload_to_bucket(file_path, &cloud_path).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_metadata_operations() {
let cloud_mgr = setup_test_environment();
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "metadata test content").unwrap();
let file_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.upload_to_bucket(file_path, "testuser/metadata/test_file.txt").await;
assert!(result.is_ok() || result.is_err());
let result = cloud_mgr.list_cloud_files("testuser/metadata").await;
assert!(result.is_ok() || result.is_err());
cleanup_test_environment();
}
#[tokio::test]
async fn test_performance_under_load() {
let cloud_mgr = setup_test_environment();
for i in 0..20 {
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "performance test {}", i).unwrap();
let file_path = temp_file.path().to_str().unwrap();
let cloud_path = format!("testuser/performance_test_{}.txt", i);
let result = cloud_mgr.upload_to_bucket(file_path, &cloud_path).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_error_recovery() {
let cloud_mgr = setup_test_environment();
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "recovery test content").unwrap();
let file_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.upload_to_bucket(file_path, "testuser/recovery_test.txt").await;
assert!(result.is_ok() || result.is_err());
let download_file = NamedTempFile::new().unwrap();
let download_path = download_file.path().to_str().unwrap();
let result = cloud_mgr.download_from_bucket("testuser/recovery_test.txt", download_path).await;
assert!(result.is_ok() || result.is_err());
cleanup_test_environment();
}
#[tokio::test]
async fn test_resource_cleanup() {
let cloud_mgr = setup_test_environment();
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "cleanup test content").unwrap();
let file_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.upload_to_bucket(file_path, "testuser/cleanup_test.txt").await;
assert!(result.is_ok() || result.is_err());
let result = cloud_mgr.list_cloud_files("testuser").await;
assert!(result.is_ok() || result.is_err());
cleanup_test_environment();
}
#[tokio::test]
async fn test_edge_case_parameters() {
let cloud_mgr = setup_test_environment();
let edge_cases = vec![
("", "empty_path.txt"),
("testuser/", "trailing_slash.txt"),
("/testuser", "leading_slash.txt"),
("testuser//double//slash.txt", "double_slash.txt"),
];
for (cloud_path, description) in edge_cases {
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "edge case: {}", description).unwrap();
let file_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.upload_to_bucket(file_path, cloud_path).await;
assert!(result.is_ok() || result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_large_scale_operations() {
let cloud_mgr = setup_test_environment();
for i in 0..50 {
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "large scale content {}", i).unwrap();
let file_path = temp_file.path().to_str().unwrap();
let cloud_path = format!("testuser/large_scale/file_{}.txt", i);
let result = cloud_mgr.upload_to_bucket(file_path, &cloud_path).await;
assert!(result.is_ok() || result.is_err());
}
let result = cloud_mgr.list_cloud_files("testuser/large_scale").await;
assert!(result.is_ok() || result.is_err());
cleanup_test_environment();
}
#[tokio::test]
async fn test_complex_scenarios() {
let cloud_mgr = setup_test_environment();
{
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "complex scenario 1").unwrap();
let file_path = temp_file.path().to_str().unwrap();
let cloud_path = "testuser/complex/scenario1.txt";
let upload_result = cloud_mgr.upload_to_bucket(file_path, cloud_path).await;
assert!(upload_result.is_ok() || upload_result.is_err());
let download_file = NamedTempFile::new().unwrap();
let download_path = download_file.path().to_str().unwrap();
let download_result = cloud_mgr.download_from_bucket(cloud_path, download_path).await;
assert!(download_result.is_ok() || download_result.is_err());
}
{
let list_result = cloud_mgr.list_cloud_files("testuser/complex").await;
assert!(list_result.is_ok() || list_result.is_err());
let mut temp_file = NamedTempFile::new().unwrap();
writeln!(temp_file, "complex scenario 2").unwrap();
let file_path = temp_file.path().to_str().unwrap();
let cloud_path = "testuser/complex/scenario2.txt";
let upload_result = cloud_mgr.upload_to_bucket(file_path, cloud_path).await;
assert!(upload_result.is_ok() || upload_result.is_err());
}
cleanup_test_environment();
}
#[tokio::test]
async fn test_database_manager_interface_get_table_schema() {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let storage_path = format!("tmp/timon_test_db_interface_{}", timestamp);
let data_path = format!("{}/data", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true},
"name": {"type": "string"}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let result = db_manager.get_table_schema("test_db", "test_table");
assert!(result.is_ok());
let schema = result.unwrap();
assert!(schema.get("id").is_some());
assert!(schema.get("name").is_some());
let _ = std::fs::remove_dir_all(&storage_path);
}
#[tokio::test]
async fn test_database_manager_interface_get_storage_path() {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let storage_path = format!("tmp/timon_test_storage_path_{}", timestamp);
let data_path = format!("{}/data", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
let metadata = json!({
"databases": {}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let result = db_manager.get_storage_path();
assert_eq!(result, storage_path);
let _ = std::fs::remove_dir_all(&storage_path);
}
#[tokio::test]
async fn test_mock_s3_store_head_success() {
let mut cloud_files = HashMap::new();
cloud_files.insert("test/path/file.parquet".to_string(), vec![1, 2, 3, 4, 5]);
let mut modified_times = HashMap::new();
let test_time = Utc::now();
modified_times.insert("test/path/file.parquet".to_string(), test_time);
let mock_store = MockS3Store { cloud_files, modified_times };
use object_store::path::Path as StorePath;
let path = StorePath::from("test/path/file.parquet");
let result = mock_store.store_head(&path).await;
assert!(result.is_ok());
let meta = result.unwrap();
assert_eq!(meta.size, 5);
assert_eq!(meta.location.to_string(), "test/path/file.parquet");
}
#[tokio::test]
async fn test_mock_s3_store_head_not_found() {
let mock_store = MockS3Store {
cloud_files: HashMap::new(),
modified_times: HashMap::new(),
};
use object_store::path::Path as StorePath;
let path = StorePath::from("nonexistent/file.parquet");
let result = mock_store.store_head(&path).await;
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("NotFound"));
}
#[tokio::test]
async fn test_cloud_storage_manager_new() {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let storage_path = format!("tmp/timon_test_new_{}", timestamp);
let data_path = format!("{}/data", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
let metadata = json!({
"databases": {}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let _result = CloudStorageManager::<object_store::aws::AmazonS3>::new(
db_manager,
"http://localhost:9000",
"test_key",
"test_secret",
"test_bucket",
"us-west-1",
);
let _ = std::fs::remove_dir_all(&storage_path);
}
#[tokio::test]
async fn test_cloud_sink_parquet_merge_target_paths() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_merge_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true},
"value": {"type": "float"}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::{Float64Array, Int64Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Float64, false),
]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let value_array = Arc::new(Float64Array::from(vec![10.5, 20.5, 30.5]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array, value_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mut cloud_files = HashMap::new();
let s3_path = "testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet";
cloud_files.insert(s3_path.to_string(), vec![1, 2, 3]);
let mut modified_times = HashMap::new();
let past_time = Utc::now() - chrono::Duration::hours(1);
modified_times.insert(s3_path.to_string(), past_time);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok(), "cloud_sink_parquet should succeed: {:?}", result.err());
}
#[tokio::test]
async fn test_cloud_fetch_parquet_filter_files_by_date_range() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_fetch_filter_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&group_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
vec![1, 2, 3],
);
cloud_files.insert(
"testuser/test_db/test_table/2023/02/test_table_2023-02-15.parquet".to_string(),
vec![1, 2, 3],
);
let mut modified_times = HashMap::new();
modified_times.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
Utc::now(),
);
modified_times.insert(
"testuser/test_db/test_table/2023/02/test_table_2023-02-15.parquet".to_string(),
Utc::now(),
);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_cloud_fetch_parquet_read_dir_and_filter() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_fetch_readdir_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group/testuser/test_db/test_table", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&group_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
std::fs::write(format!("{}/old_file.parquet", group_path), vec![1, 2, 3]).unwrap();
std::fs::write(format!("{}/another_file.parquet", group_path), vec![1, 2, 3]).unwrap();
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
vec![1, 2, 3],
);
let mut modified_times = HashMap::new();
modified_times.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
Utc::now(),
);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_cloud_fetch_parquet_cloud_filenames() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_fetch_filenames_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group/testuser/test_db/test_table", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&group_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
vec![1, 2, 3],
);
cloud_files.insert("testuser/test_db/test_table/2023/01/another_file.parquet".to_string(), vec![1, 2, 3]);
let mut modified_times = HashMap::new();
modified_times.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
Utc::now(),
);
modified_times.insert("testuser/test_db/test_table/2023/01/another_file.parquet".to_string(), Utc::now());
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_cloud_fetch_parquet_delete_out_of_sync() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_fetch_delete_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group/testuser/test_db/test_table", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&group_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let local_file = format!("{}/out_of_sync_file.parquet", group_path);
std::fs::write(&local_file, vec![1, 2, 3]).unwrap();
assert!(std::path::Path::new(&local_file).exists());
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
vec![1, 2, 3],
);
let mut modified_times = HashMap::new();
modified_times.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
Utc::now(),
);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
assert!(!std::path::Path::new(&local_file).exists(), "Out-of-sync file should be deleted");
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_cloud_fetch_parquet_continue_when_not_in_filtered() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_fetch_continue_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
vec![1, 2, 3],
);
cloud_files.insert(
"testuser/test_db/test_table/2023/02/test_table_2023-02-15.parquet".to_string(),
vec![1, 2, 3],
);
let mut modified_times = HashMap::new();
modified_times.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
Utc::now(),
);
modified_times.insert(
"testuser/test_db/test_table/2023/02/test_table_2023-02-15.parquet".to_string(),
Utc::now(),
);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_cloud_fetch_parquet_skip_up_to_date() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_fetch_skip_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group/testuser/test_db/test_table", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&group_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let local_file = format!("{}/test_table_2023-01-15.parquet", group_path);
std::fs::write(&local_file, vec![1, 2, 3, 4, 5]).unwrap();
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
vec![1, 2, 3],
);
let mut modified_times = HashMap::new();
let past_time = Utc::now() - chrono::Duration::hours(1);
modified_times.insert("testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(), past_time);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
assert!(std::path::Path::new(&local_file).exists(), "Local file should still exist (up-to-date)");
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_cloud_fetch_parquet_download_outdated() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_fetch_download_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let group_path = format!("{}/group/testuser/test_db/test_table", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&group_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let local_file = format!("{}/test_table_2023-01-15.parquet", group_path);
let mut cloud_files = HashMap::new();
cloud_files.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
vec![1, 2, 3, 4, 5],
);
let mut modified_times = HashMap::new();
let future_time = Utc::now() + chrono::Duration::hours(1);
modified_times.insert(
"testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet".to_string(),
future_time,
);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-01-31");
let result = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
assert!(std::path::Path::new(&local_file).exists(), "File should be downloaded");
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_regex() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_regex_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mock_s3 = MockS3Store::empty();
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_date_extraction() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_date_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-12-25", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mock_s3 = MockS3Store::empty();
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_s3_filename() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_s3name_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mock_s3 = MockS3Store::empty();
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_s3_temp_path() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_temppath_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mut cloud_files = HashMap::new();
let s3_path = "testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet";
cloud_files.insert(s3_path.to_string(), vec![1, 2, 3]);
let mut modified_times = HashMap::new();
let future_time = Utc::now() + chrono::Duration::hours(1);
modified_times.insert(s3_path.to_string(), future_time);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_get_local_modified_time() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_mtime_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mut cloud_files = HashMap::new();
let s3_path = "testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet";
cloud_files.insert(s3_path.to_string(), vec![1, 2, 3]);
let mut modified_times = HashMap::new();
modified_times.insert(s3_path.to_string(), Utc::now());
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_s3_not_exists() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_notexists_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mock_s3 = MockS3Store::empty();
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_local_newer_than_s3() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_newer_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![10, 20, 30])); let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mut cloud_files = HashMap::new();
let s3_path = "testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet";
let s3_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let s3_id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let s3_batch = RecordBatch::try_new(Arc::new(s3_schema.clone()), vec![s3_id_array]).unwrap();
let mut s3_data = Vec::new();
let s3_file = std::io::Cursor::new(&mut s3_data);
let mut s3_writer = ArrowWriter::try_new(s3_file, s3_batch.schema(), None).unwrap();
s3_writer.write(&s3_batch).unwrap();
s3_writer.close().unwrap();
cloud_files.insert(s3_path.to_string(), s3_data);
let mut modified_times = HashMap::new();
let past_time = Utc::now() - chrono::Duration::hours(1);
modified_times.insert(s3_path.to_string(), past_time);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_read_local_batches() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_readlocal_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![10, 20]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mut cloud_files = HashMap::new();
let s3_path = "testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet";
let s3_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let s3_id_array = Arc::new(Int64Array::from(vec![1, 2]));
let s3_batch = RecordBatch::try_new(Arc::new(s3_schema.clone()), vec![s3_id_array]).unwrap();
let mut s3_data = Vec::new();
let s3_file = std::io::Cursor::new(&mut s3_data);
let mut s3_writer = ArrowWriter::try_new(s3_file, s3_batch.schema(), None).unwrap();
s3_writer.write(&s3_batch).unwrap();
s3_writer.close().unwrap();
cloud_files.insert(s3_path.to_string(), s3_data);
let mut modified_times = HashMap::new();
let past_time = Utc::now() - chrono::Duration::hours(1);
modified_times.insert(s3_path.to_string(), past_time);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_merge_batches() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_merge_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![10]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mut cloud_files = HashMap::new();
let s3_path = "testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet";
let s3_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let s3_id_array = Arc::new(Int64Array::from(vec![1]));
let s3_batch = RecordBatch::try_new(Arc::new(s3_schema.clone()), vec![s3_id_array]).unwrap();
let mut s3_data = Vec::new();
let s3_file = std::io::Cursor::new(&mut s3_data);
let mut s3_writer = ArrowWriter::try_new(s3_file, s3_batch.schema(), None).unwrap();
s3_writer.write(&s3_batch).unwrap();
s3_writer.close().unwrap();
cloud_files.insert(s3_path.to_string(), s3_data);
let mut modified_times = HashMap::new();
let past_time = Utc::now() - chrono::Duration::hours(1);
modified_times.insert(s3_path.to_string(), past_time);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_process_sink_parquet_file_local_older_than_s3() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_process_older_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mut cloud_files = HashMap::new();
let s3_path = "testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet";
cloud_files.insert(s3_path.to_string(), vec![1, 2, 3, 4, 5]);
let mut modified_times = HashMap::new();
let future_time = Utc::now() + chrono::Duration::hours(1);
modified_times.insert(s3_path.to_string(), future_time);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_upload_merged_batches() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_upload_merged_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
let merge_path = format!("{}/merge_workspace/testuser", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
std::fs::create_dir_all(&merge_path).unwrap();
let metadata = json!({
"databases": {
"test_db": {
"tables": {
"test_table": {
"path": format!("{}/test_db/test_table", data_path),
"schema": {
"id": {"type": "int", "unique": true}
}
}
}
}
}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let base_table_path = format!("{}/test_db/test_table", data_path);
std::fs::create_dir_all(&base_table_path).unwrap();
let table_path = format!("{}/test_db/test_table/partition_date=2023-01-15", data_path);
std::fs::create_dir_all(&table_path).unwrap();
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::ArrowWriter;
use std::sync::Arc;
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let id_array = Arc::new(Int64Array::from(vec![10]));
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![id_array]).unwrap();
let parquet_file = format!("{}/data.parquet", table_path);
let file = std::fs::File::create(&parquet_file).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let mut cloud_files = HashMap::new();
let s3_path = "testuser/test_db/test_table/2023/01/test_table_2023-01-15.parquet";
let s3_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let s3_id_array = Arc::new(Int64Array::from(vec![1]));
let s3_batch = RecordBatch::try_new(Arc::new(s3_schema.clone()), vec![s3_id_array]).unwrap();
let mut s3_data = Vec::new();
let s3_file = std::io::Cursor::new(&mut s3_data);
let mut s3_writer = ArrowWriter::try_new(s3_file, s3_batch.schema(), None).unwrap();
s3_writer.write(&s3_batch).unwrap();
s3_writer.close().unwrap();
cloud_files.insert(s3_path.to_string(), s3_data);
let mut modified_times = HashMap::new();
let past_time = Utc::now() - chrono::Duration::hours(1);
modified_times.insert(s3_path.to_string(), past_time);
let mock_s3 = MockS3Store { cloud_files, modified_times };
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let result = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_download_from_bucket_not_found() {
use std::process;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let pid = process::id();
let storage_path = format!("tmp/timon_test_download_nf_{}_{}_{}", timestamp, pid, counter);
let data_path = format!("{}/data", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
let metadata = json!({
"databases": {}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let mock_s3 = MockS3Store::empty();
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr = CloudStorageManager::<MockS3Store>::new_with_mock(db_manager, mock_s3, Some("test-bucket"));
let temp_file = NamedTempFile::new().unwrap();
let download_path = temp_file.path().to_str().unwrap();
let result = cloud_mgr.download_from_bucket("nonexistent/file.parquet", download_path).await;
let _ = std::fs::remove_dir_all(&storage_path);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_s3_store_interface_error_handling_attempt() {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
let storage_path = format!("tmp/timon_test_s3_error_{}", timestamp);
let data_path = format!("{}/data", storage_path);
std::fs::create_dir_all(&data_path).unwrap();
let metadata = json!({
"databases": {}
});
std::fs::write(format!("{}/metadata.json", storage_path), serde_json::to_string(&metadata).unwrap()).unwrap();
let db_manager = DatabaseManager::new(&storage_path, 30, "testuser");
let cloud_mgr_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
CloudStorageManager::<object_store::aws::AmazonS3>::new(
db_manager,
"http://127.0.0.1:65535", "invalid_key",
"invalid_secret",
"invalid_bucket",
"us-west-1",
)
}));
if let Ok(Ok(cloud_mgr)) = cloud_mgr_result {
use object_store::path::Path as StorePath;
let test_path = StorePath::from("nonexistent/path/file.parquet");
let _ = cloud_mgr.s3_store.store_head(&test_path).await;
let _ = cloud_mgr.s3_store.store_get(&test_path).await;
let _ = cloud_mgr.s3_store.store_put(&test_path, bytes::Bytes::from("test")).await;
}
let _ = std::fs::remove_dir_all(&storage_path);
}
#[tokio::test]
async fn test_cloud_sync_parquet_filter_files_by_date_range_line284() {
let cloud_mgr = setup_test_environment();
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-12-31");
let _ = cloud_mgr.cloud_sync_parquet("test_db", "test_table", &date_range, Some("testuser")).await;
}
#[tokio::test]
async fn test_cloud_sync_parquet_continue_paths_lines315_324_326() {
let cloud_mgr = setup_test_environment();
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-12-31");
let _ = cloud_mgr.cloud_sync_parquet("test_db", "test_table", &date_range, Some("testuser")).await;
}
#[tokio::test]
async fn test_process_sink_parquet_file_regex_creation_line356() {
let cloud_mgr = setup_test_environment();
let test_file = "tmp/timon_test/data/test_db/test_table/partition_date=2023-01-15/file.parquet";
std::fs::create_dir_all("tmp/timon_test/data/test_db/test_table/partition_date=2023-01-15").unwrap();
std::fs::write(test_file, b"test").unwrap();
let _ = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
let _ = std::fs::remove_file(test_file);
let _ = std::fs::remove_dir_all("tmp/timon_test/data/test_db/test_table/partition_date=2023-01-15");
}
#[tokio::test]
async fn test_cloud_sink_parquet_store_head_error_line379() {
let cloud_mgr = setup_test_environment();
let _ = cloud_mgr.cloud_sink_parquet("test_db", "test_table").await;
}
#[tokio::test]
async fn test_cloud_fetch_parquet_error_not_notfound_line493() {
let cloud_mgr = setup_test_environment();
let mut date_range = HashMap::new();
date_range.insert("start_date", "2023-01-01");
date_range.insert("end_date", "2023-12-31");
let _ = cloud_mgr.cloud_fetch_parquet("testuser", "test_db", "test_table", &date_range).await;
}