use super::errors::TimonError;
use super::helpers::{
build_rules_tree, get_property_fields, infer_schema_with_coercion, json_to_arrow, record_batches_to_json, rounded_timestamp, row_to_json,
};
use super::sql_query_parser::extract_table_names_and_ctes;
use chrono::{NaiveDateTime, TimeZone, Utc};
use datafusion::arrow::array::Array;
use datafusion::arrow::datatypes::{DataType, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::datasource::MemTable;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::parquet::file::reader::{FileReader, SerializedFileReader};
use datafusion::prelude::*;
use fs2::FileExt;
use futures::executor;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::path::Path;
use std::sync::{Arc, Mutex, OnceLock, RwLock};
use std::time::{Duration, Instant};
use std::{fmt, fs};
use tokio::io::Result as TokioResult;
const LOCK_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600);
const LOCK_CLEANUP_THRESHOLD: Duration = Duration::from_secs(3600);
const ORPHANED_TEMP_FILE_AGE_THRESHOLD: Duration = Duration::from_secs(3600);
const METADATA_READ_MAX_RETRIES: usize = 10;
const METADATA_READ_RETRY_DELAY: Duration = Duration::from_millis(50);
const METADATA_SAVE_MAX_RETRIES: usize = 5;
const METADATA_SAVE_RETRY_DELAY: Duration = Duration::from_millis(100);
const METADATA_LOCK_MAX_RETRIES: usize = 5;
const METADATA_LOCK_RETRY_DELAY: Duration = Duration::from_millis(100);
const MAX_METADATA_BACKUPS: usize = 5;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum NameType {
Database,
Table,
}
impl fmt::Display for NameType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
NameType::Database => write!(f, "Database"),
NameType::Table => write!(f, "Table"),
}
}
}
struct FileLockEntry {
lock: Arc<Mutex<()>>,
last_accessed: Instant,
}
fn get_file_locks() -> &'static Mutex<HashMap<String, FileLockEntry>> {
static FILE_LOCKS: OnceLock<Mutex<HashMap<String, FileLockEntry>>> = OnceLock::new();
FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()))
}
fn cleanup_unused_locks(locks: &mut HashMap<String, FileLockEntry>) {
let now = Instant::now();
locks.retain(|_path, entry| now.duration_since(entry.last_accessed) < LOCK_CLEANUP_THRESHOLD);
}
pub enum DataFusionOutput {
Json(Value),
DataFrame(DataFrame),
}
impl fmt::Debug for DataFusionOutput {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DataFusionOutput::Json(s) => write!(f, "Json({})", s),
DataFusionOutput::DataFrame(df) => {
let runtime = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
return write!(f, "DataFrame(<Failed to create runtime: {}>)", e);
}
};
let result = match runtime.block_on(async { df.clone().collect().await }) {
Ok(batches) => batches,
Err(e) => {
return write!(f, "DataFrame(<Failed to collect DataFrame results: {:?}>)", e);
}
};
for batch in result {
writeln!(f, "{:?}", batch)?;
}
Ok(())
}
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct Metadata {
databases: HashMap<String, Database>, }
#[derive(Serialize, Deserialize, Debug, Clone)]
struct Database {
tables: HashMap<String, Table>, }
#[derive(Serialize, Deserialize, Debug, Clone)]
struct Table {
path: String, schema: serde_json::Value, }
#[derive(Serialize, Deserialize, Debug, Clone)]
struct DatabaseInfo {
names: Vec<String>,
}
struct MetadataCache {
metadata: Option<Metadata>,
timestamp: Option<Instant>,
}
pub struct DatabaseManager {
pub storage_path: String,
pub username: String,
metadata: Metadata,
data_path: String,
metadata_path: String,
bucket_interval: u32,
session_context: SessionContext,
cache: Arc<RwLock<MetadataCache>>,
cache_ttl: Duration,
}
impl Clone for DatabaseManager {
fn clone(&self) -> Self {
Self {
storage_path: self.storage_path.clone(),
username: self.username.clone(),
metadata: self.metadata.clone(),
data_path: self.data_path.clone(),
metadata_path: self.metadata_path.clone(),
bucket_interval: self.bucket_interval,
session_context: SessionContext::new(), cache: Arc::clone(&self.cache), cache_ttl: self.cache_ttl,
}
}
}
impl DatabaseManager {
pub fn new(storage_path: &str, bucket_interval: u32, username: &str) -> Self {
let data_path = format!("{}/data", storage_path);
let metadata_path = format!("{}/metadata.json", storage_path);
if let Err(e) = fs::create_dir_all(&data_path) {
eprintln!(
"Error: Failed to create critical data directory {}: {}. Database operations may fail.",
data_path, e
);
}
if !Path::new(&metadata_path).exists() {
match fs::File::create(&metadata_path) {
Ok(_) => {
let initial_metadata = Metadata { databases: HashMap::new() };
match serde_json::to_string(&initial_metadata) {
Ok(metadata_json) => {
if let Err(e) = fs::write(&metadata_path, metadata_json) {
eprintln!("Error writing initial metadata to file: {}", e);
}
}
Err(e) => {
eprintln!("Error serializing initial metadata: {}", e);
}
}
}
Err(e) => eprintln!("Error creating metadata file: {}", e),
}
}
let metadata: Metadata = if Path::new(&metadata_path).exists() {
match fs::read_to_string(&metadata_path) {
Ok(file_content) => serde_json::from_str(&file_content).unwrap_or_else(|e| {
eprintln!("Warning: Failed to parse metadata file, using empty metadata: {}", e);
Metadata { databases: HashMap::new() }
}),
Err(e) => {
eprintln!("Warning: Failed to read metadata file, using empty metadata: {}", e);
Metadata { databases: HashMap::new() }
}
}
} else {
Metadata { databases: HashMap::new() }
};
let mut db_manager = DatabaseManager {
storage_path: storage_path.to_string(),
username: username.to_string(),
metadata,
data_path,
metadata_path,
bucket_interval,
session_context: SessionContext::new(),
cache: Arc::new(RwLock::new(MetadataCache {
metadata: None,
timestamp: None,
})),
cache_ttl: Duration::MAX, };
if let Err(e) = db_manager.update_metadata(storage_path) {
eprintln!("Error updating metadata: {}", e);
}
db_manager.cleanup_orphaned_temp_files();
db_manager
}
fn cleanup_orphaned_temp_files(&self) {
let cleanup_threshold = ORPHANED_TEMP_FILE_AGE_THRESHOLD;
let data_path = Path::new(&self.data_path);
if !data_path.exists() {
return;
}
fn walk_and_cleanup(dir: &Path, threshold: Duration) -> usize {
use std::time::SystemTime;
let mut cleaned_count = 0;
match fs::read_dir(dir) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
cleaned_count += walk_and_cleanup(&path, threshold);
} else if path.is_file() {
if let Some(ext) = path.extension() {
if ext == "tmp" {
match fs::metadata(&path) {
Ok(metadata) => {
if let Ok(modified) = metadata.modified() {
if let Ok(age) = SystemTime::now().duration_since(modified) {
if age > threshold {
if let Err(e) = fs::remove_file(&path) {
eprintln!("Warning: Failed to remove orphaned temp file {:?}: {}", path, e);
} else {
cleaned_count += 1;
}
}
}
}
}
Err(e) => {
eprintln!("Warning: Failed to get metadata for {:?}: {}", path, e);
}
}
}
}
}
}
}
Err(e) => {
eprintln!("Warning: Failed to read directory {:?}: {}", dir, e);
}
}
cleaned_count
}
let cleaned = walk_and_cleanup(data_path, cleanup_threshold);
if cleaned > 0 {
eprintln!("Cleaned up {} orphaned temporary file(s) on startup", cleaned);
}
}
#[inline]
fn name_matches_pattern(name: &str) -> bool {
name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
}
fn validate_name(name: &str, name_type: NameType) -> Result<(), DataFusionError> {
if name.is_empty() {
return Err(DataFusionError::Plan(format!("{} name cannot be empty", name_type)));
}
static VALID_PATTERN: OnceLock<Option<Regex>> = OnceLock::new();
let valid_pattern = VALID_PATTERN.get_or_init(|| {
Regex::new(r#"^[a-zA-Z0-9_]+$"#).map_or_else(
|e| {
eprintln!("CRITICAL: Failed to compile validation regex, using fallback: {:?}", e);
None
},
Some,
)
});
let ok = match valid_pattern {
Some(re) => re.is_match(name),
None => Self::name_matches_pattern(name),
};
if !ok {
return Err(DataFusionError::Plan(format!(
"{} name '{}' is not valid. Only alphanumeric characters and underscores (A-Z, a-z, 0-9, _) are allowed",
name_type, name
)));
}
Ok(())
}
pub fn create_database(&mut self, db_name: &str) -> Result<(), DataFusionError> {
Self::validate_name(db_name, NameType::Database)?;
self.metadata = self
.get_metadata_cached_sync()
.map_err(|e| DataFusionError::Execution(format!("Failed to reload metadata: {}", e)))?;
let db_data_path = format!("{}/{}", self.data_path, db_name);
if let Err(e) = fs::create_dir(&db_data_path) {
return Err(DataFusionError::Execution(format!("Error creating data directory {}: {}", db_name, e)));
}
self
.metadata
.databases
.entry(db_name.to_string())
.or_insert_with(|| Database { tables: HashMap::new() });
self
.save_metadata()
.map_err(|e| DataFusionError::Execution(format!("Failed to save metadata: {}", e)))?;
Ok(())
}
pub fn create_table(&mut self, db_name: &str, table_name: &str, schema_json: &str) -> Result<String, Box<dyn Error>> {
Self::validate_name(db_name, NameType::Database).map_err(|e| e.to_string())?;
Self::validate_name(table_name, NameType::Table).map_err(|e| e.to_string())?;
self.metadata = self
.get_metadata_cached_sync()
.map_err(|e| DataFusionError::Execution(format!("Failed to reload metadata: {}", e)))?;
let schema: Value = serde_json::from_str(schema_json)?;
let db_path = self.metadata.databases.get_mut(db_name);
if db_path.is_none() {
return Err(format!("Database '{}' does not exist.", db_name).into());
}
self.validate_schema_structure(&schema)?;
let database = self
.metadata
.databases
.get_mut(db_name)
.ok_or_else(|| format!("Database '{}' does not exist.", db_name))?;
if database.tables.contains_key(table_name) {
return Err(format!("Table '{}' already exists in database '{}'.", table_name, db_name).into());
}
let table_path = format!("{}/{}/{}", self.data_path, db_name, table_name);
fs::create_dir_all(&table_path)?;
let table = Table { schema, path: table_path };
database.tables.insert(table_name.to_string(), table);
self.save_metadata()?;
Ok(format!("Table '{}' was successfully created in database '{}'.", table_name, db_name))
}
pub fn list_databases(&mut self) -> Result<Vec<String>, DataFusionError> {
self.metadata = self
.get_metadata_cached_sync()
.map_err(|e| DataFusionError::Execution(format!("Failed to reload metadata: {}", e)))?;
let file_content = match fs::read_to_string(&self.metadata_path) {
Ok(content) => content,
Err(e) => return Err(DataFusionError::Execution(format!("Failed to read metadata file: {}", e))),
};
let metadata: Metadata = match serde_json::from_str(&file_content) {
Ok(m) => m,
Err(e) => return Err(DataFusionError::Execution(format!("Failed to parse metadata: {}", e))),
};
let databases_list = metadata.databases.keys().cloned().collect::<Vec<String>>();
Ok(databases_list)
}
pub fn list_tables(&mut self, db_name: &str) -> Result<Vec<String>, DataFusionError> {
Self::validate_name(db_name, NameType::Database)?;
self.metadata = self
.get_metadata_cached_sync()
.map_err(|e| DataFusionError::Execution(format!("Failed to reload metadata: {}", e)))?;
if let Some(database) = self.metadata.databases.get(db_name) {
let tables_list = database.tables.keys().cloned().collect::<Vec<String>>();
Ok(tables_list)
} else {
Err(DataFusionError::Plan(format!("Database '{}' not found", db_name)))
}
}
pub fn delete_database(&mut self, db_name: &str) -> Result<(), DataFusionError> {
Self::validate_name(db_name, NameType::Database)?;
self.metadata = self
.get_metadata_cached_sync()
.map_err(|e| DataFusionError::Execution(format!("Failed to reload metadata: {}", e)))?;
if self.metadata.databases.remove(db_name).is_some() {
self.save_metadata().map_err(|e| {
eprintln!("Error saving metadata after deleting database '{}': {}", db_name, e);
DataFusionError::Execution(format!("Failed to save metadata after deleting database: {}", e))
})?;
} else {
return Err(DataFusionError::Plan(format!("Failed to remove database '{}' from metadata", db_name)));
}
let db_path = format!("{}/{}", self.data_path, db_name);
if fs::remove_dir_all(db_path).is_err() {
return Err(DataFusionError::Plan(format!("Failed to remove database directory '{}'", db_name)));
}
Ok(())
}
pub fn delete_table(&mut self, db_name: &str, table_name: &str) -> Result<(), DataFusionError> {
Self::validate_name(db_name, NameType::Database)?;
Self::validate_name(table_name, NameType::Table)?;
match self.get_metadata_cached_sync() {
Ok(metadata) => {
self.metadata = metadata;
}
Err(e) => {
let error_msg = e.to_string();
if error_msg.contains("No such file") || error_msg.contains("not found") {
} else {
return Err(DataFusionError::Execution(format!("Failed to reload metadata: {}", e)));
}
}
}
if let Some(db) = self.metadata.databases.get_mut(db_name) {
if db.tables.remove(table_name).is_some() {
self.save_metadata().map_err(|e| {
eprintln!("Error saving metadata after deleting table '{}': {}", table_name, e);
DataFusionError::Execution(format!("Failed to save metadata after deleting table: {}", e))
})?;
let table_path = format!("{}/{}/{}", self.data_path, db_name, table_name);
if fs::remove_dir_all(table_path).is_err() {
return Err(DataFusionError::Plan(format!("Failed to remove table directory '{}'", table_name)));
}
Ok(())
} else {
Err(DataFusionError::Plan(format!(
"Table '{}' not found in database '{}'",
table_name, db_name
)))
}
} else {
Err(DataFusionError::Plan(format!("Database '{}' not found", db_name)))
}
}
pub fn insert(&mut self, db_name: &str, table_name: &str, json_data: &str) -> Result<Vec<Value>, Box<dyn Error>> {
Self::validate_name(db_name, NameType::Database).map_err(|e| e.to_string())?;
Self::validate_name(table_name, NameType::Table).map_err(|e| e.to_string())?;
self.metadata = self.get_metadata_cached_sync()?;
let mut new_json_values: Vec<Value> = serde_json::from_str(json_data)?;
let table_path = self
.get_table_path(db_name, table_name)
.ok_or_else(|| format!("Database '{}' or Table '{}' does not exist.", db_name, table_name))?;
let table_schema = self.get_table_schema(db_name, table_name)?;
let conditions = build_rules_tree(table_schema.clone());
let mut invalid_json_values = Vec::new();
if !conditions.is_empty() {
let tree = json_rules_engine::and(conditions);
for json_value in &new_json_values {
let result = tree.check_value(json_value);
if result.status == json_rules_engine::Status::NotMet {
println!("record condition mismatch: {}", json_value);
invalid_json_values.push(json_value.clone());
}
}
}
let datetime_binding = get_property_fields(&table_schema, "datetime")?;
let datetime_field = datetime_binding
.get(0)
.ok_or_else(|| format!("No 'datetime' field found in the table schema."))?;
let unique_fields = get_property_fields(&table_schema, "unique")?;
let build_key = |record: &Value| -> String {
unique_fields
.iter()
.map(|field| record.get(field).map(|v| v.to_string()).unwrap_or_default())
.collect::<Vec<String>>()
.join("-")
};
let schema_field_order: Vec<String> = table_schema.as_object().map(|o| o.keys().cloned().collect()).unwrap_or_default();
for json_value in new_json_values.iter_mut() {
match json_value.get(datetime_field) {
Some(Value::String(date_str)) => {
let parsed_timestamp = NaiveDateTime::parse_from_str(date_str, "%Y.%m.%d %H:%M:%S")
.or_else(|_| NaiveDateTime::parse_from_str(date_str, "%Y-%m-%dT%H:%M:%S%.3fZ"))
.or_else(|_| NaiveDateTime::parse_from_str(date_str, "%Y-%m-%dT%H:%M:%S%.fZ"))
.or_else(|_| NaiveDateTime::parse_from_str(date_str, "%Y-%m-%d %H:%M:%S"))
.map(|naive_dt| Utc.from_utc_datetime(&naive_dt).timestamp());
match parsed_timestamp {
Ok(timestamp) => {
json_value[datetime_field] = json!(timestamp);
}
Err(_) => return Err(format!("Invalid datetime format for field '{}'.", datetime_field).into()),
}
}
_ => return Err(format!("Missing required datetime field: '{}'.", datetime_field).into()),
}
}
for json_value in new_json_values.iter_mut() {
self.validate_data_against_schema(&table_schema, json_value)?;
}
let mut seen_records: HashMap<String, Value> = HashMap::new();
let mut records_by_file: HashMap<String, Vec<Value>> = HashMap::new();
for new_record in new_json_values.into_iter() {
let key = build_key(&new_record);
if seen_records.insert(key.clone(), new_record.clone()).is_some() {
continue;
}
let timestamp = new_record.get(datetime_field).and_then(|t| t.as_i64()).unwrap_or(0);
let partition_value = rounded_timestamp(timestamp, self.bucket_interval);
let partition_dir = format!("{}/partition_date={}", table_path, partition_value);
if let Err(e) = fs::create_dir_all(&partition_dir) {
eprintln!("Error creating partition directory '{}': {}", partition_dir, e);
return Err(format!("Failed to create partition directory '{}': {}", partition_dir, e).into());
}
let target_file = format!("{}/data.parquet", partition_dir);
records_by_file.entry(target_file).or_insert_with(Vec::new).push(new_record);
}
for (file_path, new_records) in records_by_file {
let file_path_clone = file_path.clone();
if let Err(e) = Self::atomic_file_insert(
Path::new(&file_path_clone),
&new_records,
&build_key,
Some(&schema_field_order),
Some(&table_schema),
) {
eprintln!("Error in atomic_file_insert for '{}': {}", file_path_clone, e);
return Err(format!("Failed to insert records into file '{}': {}", file_path_clone, e).into());
}
}
Ok(invalid_json_values)
}
pub async fn query(
&self,
db_name: &str,
sql_query: &str,
username: Option<&str>,
is_json_format: bool,
limit_partitions: Option<usize>,
) -> DataFusionResult<DataFusionOutput> {
Self::validate_name(db_name, NameType::Database)?;
let (mut table_names, cte_names) =
extract_table_names_and_ctes(&sql_query).map_err(|e| DataFusionError::Execution(format!("Failed to extract table names: {}", e)))?;
for table_name in &table_names {
Self::validate_name(table_name, NameType::Table)?;
}
table_names.retain(|name| !cte_names.contains(name));
let metadata = self
.get_metadata_cached()
.await
.map_err(|e| DataFusionError::Execution(format!("Failed to read metadata: {}", e)))?;
if let Some(database) = metadata.databases.get(db_name) {
for table_name in &table_names {
if !database.tables.contains_key(table_name) {
if username.is_none() {
return Err(DataFusionError::Plan(format!(
"Table '{}' referenced in query does not exist in database '{}'",
table_name, db_name
)));
}
}
}
for table_name in &table_names {
self.register_single_table(db_name, table_name, username).await?;
}
}
let effective_sql = if let Some(limit) = limit_partitions {
let mut all_partitions = Vec::new();
if let Some(database) = metadata.databases.get(db_name) {
for (table_name, _) in &database.tables {
let table_dir = self
.resolve_table_dir(db_name, table_name, username)
.map_err(|e| DataFusionError::Execution(format!("Failed to resolve table directory: {}", e)))?;
if let Ok(entries) = std::fs::read_dir(&table_dir) {
for entry in entries.flatten() {
if entry.path().is_dir() {
if let Some(name) = entry.path().file_name().and_then(|n| n.to_str()) {
if name.starts_with("partition_date=") {
let date_value = name.strip_prefix("partition_date=").unwrap_or("");
if !all_partitions.contains(&date_value.to_string()) {
all_partitions.push(date_value.to_string());
}
}
}
}
}
}
}
}
all_partitions.sort();
let selected_dates: Vec<_> = all_partitions.iter().rev().take(limit).cloned().collect();
if !selected_dates.is_empty() {
let date_list = selected_dates.iter().map(|d| format!("'{}'", d)).collect::<Vec<_>>().join(", ");
let has_where = sql_query.to_uppercase().contains("WHERE");
if has_where {
format!("{} AND partition_date IN ({})", sql_query, date_list)
} else {
format!("{} WHERE partition_date IN ({})", sql_query, date_list)
}
} else {
sql_query.to_string()
}
} else {
sql_query.to_string()
};
let final_df = self.session_context.sql(&effective_sql).await?;
let final_results = final_df.collect().await?;
let result = if is_json_format {
let json_result = record_batches_to_json(&final_results)
.map_err(|e| DataFusionError::Execution(format!("Failed to convert record batches to JSON: {:?}", e)))?;
DataFusionOutput::Json(json_result)
} else {
let final_schema = final_results[0].schema();
let final_mem_table = MemTable::try_new(final_schema, vec![final_results])?;
let final_df = self.session_context.read_table(Arc::new(final_mem_table))?;
DataFusionOutput::DataFrame(final_df)
};
Ok(result)
}
async fn register_single_table(&self, db_name: &str, table_name: &str, username: Option<&str>) -> DataFusionResult<()> {
let _ = self.session_context.deregister_table(table_name);
let table_dir = match self.resolve_table_dir(db_name, table_name, username) {
Ok(dir) => dir,
Err(e) => {
if username.is_some() {
eprintln!(
"INFO: Table '{}' does not exist for group user '{:?}': {}. Skipping registration - query will return 0 rows for this table.",
table_name, username, e
);
return Ok(());
} else {
let error = TimonError::table_not_found_for_user(table_name, username);
return Err(DataFusionError::Plan(error.to_string()));
}
}
};
let has_parquet_files = std::fs::read_dir(&table_dir)
.map(|entries| {
entries.filter_map(|e| e.ok()).any(|e| {
let path = e.path();
path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("parquet")
|| (path.is_dir() && {
std::fs::read_dir(&path)
.map(|sub_entries| {
sub_entries
.filter_map(|se| se.ok())
.any(|se| se.path().extension().and_then(|s| s.to_str()) == Some("parquet"))
})
.unwrap_or(false)
})
})
})
.unwrap_or(false);
if !has_parquet_files {
if username.is_some() {
eprintln!(
"INFO: Table '{}' exists for group user '{:?}' but contains no parquet files. Skipping registration - query will return 0 rows for this table.",
table_name, username
);
return Ok(());
} else {
let error = TimonError::no_data_available(table_name);
return Err(DataFusionError::Plan(error.to_string()));
}
}
let file_format = ParquetFormat::default();
let listing_options = if username.is_none() {
ListingOptions::new(Arc::new(file_format))
.with_file_extension(".parquet")
.with_table_partition_cols(vec![(
"partition_date".to_string(),
DataType::Utf8, )])
} else {
ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet")
};
let table_url = ListingTableUrl::parse(&table_dir).map_err(|e| DataFusionError::Execution(format!("Failed to parse table URL: {}", e)))?;
let merged_schema = match infer_schema_with_coercion(&table_dir).await {
Ok(schema) => {
eprintln!(
"Successfully merged schema for table '{}' with {} fields",
table_name,
schema.fields().len()
);
Some(schema)
}
Err(e) => {
eprintln!(
"Warning: Failed to infer schema with coercion for table '{}': {}. Falling back to DataFusion's default inference.",
table_name, e
);
None
}
};
let config = if let Some(schema) = merged_schema {
ListingTableConfig::new(table_url)
.with_listing_options(listing_options)
.with_schema(schema)
} else {
ListingTableConfig::new(table_url)
.with_listing_options(listing_options)
.infer_schema(&self.session_context.state())
.await?
};
let listing_table = ListingTable::try_new(config)?;
self
.session_context
.register_table(table_name, Arc::new(listing_table))
.map_err(|e| DataFusionError::Execution(format!("Failed to register table '{}': {}", table_name, e)))?;
Ok(())
}
fn resolve_table_dir(&self, db_name: &str, table_name: &str, username: Option<&str>) -> Result<String, Box<dyn Error>> {
Self::validate_name(db_name, NameType::Database).map_err(|e| e.to_string())?;
Self::validate_name(table_name, NameType::Table).map_err(|e| e.to_string())?;
let metadata = self.get_metadata_cached_sync()?;
let database = metadata
.databases
.get(db_name)
.ok_or_else(|| format!("Database '{}' does not exist.", db_name))?;
let base_table_path = if let Some(table) = database.tables.get(table_name) {
Path::new(&table.path)
} else {
if username.is_none() {
return Err(format!("Table '{}' does not exist in database '{}'.", table_name, db_name).into());
}
if let Some((_, any_table)) = database.tables.iter().next() {
Path::new(&any_table.path)
} else {
return Err(format!("Database '{}' has no tables to determine base path structure.", db_name).into());
}
};
let base_root = base_table_path
.ancestors()
.nth(2) .ok_or_else(|| format!("Failed to determine base directory from '{}'", base_table_path.display()))?
.parent() .ok_or_else(|| format!("Failed to get parent of base directory"))?
.to_path_buf();
if let Some(user) = username {
let group_base = base_root.join("group").join(user).join(db_name);
let group_path = group_base.join(table_name);
if group_path.exists() {
return Ok(group_path.to_string_lossy().to_string());
}
return Err(
format!(
"Table '{}' does not exist in group path for user '{}'. Group users should only access tables that exist in their path.",
table_name, user
)
.into(),
);
}
Ok(base_table_path.to_string_lossy().to_string())
}
fn atomic_file_insert(
file_path: &Path,
new_records: &[Value],
build_key: &dyn Fn(&Value) -> String,
preferred_field_order: Option<&[String]>,
table_schema: Option<&Value>,
) -> Result<(), Box<dyn Error>> {
if let Some(parent) = file_path.parent() {
fs::create_dir_all(parent)?;
}
let file_path_str = file_path.to_string_lossy().to_string();
let file_mutex = {
let now = Instant::now();
let mut locks = get_file_locks()
.lock()
.map_err(|e| format!("Failed to acquire file locks mutex (poisoned): {}", e))?;
static LAST_CLEANUP: OnceLock<Mutex<Instant>> = OnceLock::new();
let last_cleanup = LAST_CLEANUP.get_or_init(|| Mutex::new(Instant::now()));
if let Ok(mut last) = last_cleanup.lock() {
if now.duration_since(*last) >= LOCK_CLEANUP_INTERVAL {
cleanup_unused_locks(&mut locks);
*last = now;
}
}
let entry = locks.entry(file_path_str.clone()).or_insert_with(|| FileLockEntry {
lock: Arc::new(Mutex::new(())),
last_accessed: now,
});
entry.last_accessed = now;
entry.lock.clone()
};
let _guard = file_mutex
.lock()
.map_err(|e| format!("Failed to acquire file mutex for {} (poisoned): {}", file_path_str, e))?;
let mut existing_records: Vec<Value> = if file_path.exists() {
match Self::read_parquet_file_static(file_path) {
Ok(records) => records,
Err(_) => {
Vec::new()
}
}
} else {
Vec::new()
};
let mut existing_keys: HashMap<String, usize> = HashMap::new();
for (index, record) in existing_records.iter().enumerate() {
let key = build_key(record);
existing_keys.insert(key, index);
}
let initial_count = existing_records.len();
for new_record in new_records {
let key = build_key(new_record);
if let Some(&existing_index) = existing_keys.get(&key) {
existing_records[existing_index] = new_record.clone();
} else {
existing_records.push(new_record.clone());
existing_keys.insert(key, existing_records.len() - 1);
}
}
if existing_records.is_empty() {
return Err(
format!(
"No records to write to '{}' (this should not happen - had {} existing, {} new)",
file_path.display(),
initial_count,
new_records.len()
)
.into(),
);
}
let (arrays, schema) = json_to_arrow(&existing_records, preferred_field_order, table_schema)
.map_err(|e| format!("Failed to convert records to arrow format for '{}': {}", file_path.display(), e))?;
Self::parquet_file_writer_locked(file_path, schema, arrays)
.map_err(|e| format!("Failed to write parquet file '{}': {}", file_path.display(), e))?;
Ok(())
}
fn validate_parquet_file(file_path: &Path, expected_schema: Option<&Schema>) -> Result<Schema, Box<dyn Error>> {
let file = fs::File::open(file_path).map_err(|e| format!("Failed to open parquet file '{}' for validation: {}", file_path.display(), e))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
format!(
"Failed to create parquet reader for '{}': file may be corrupted or invalid parquet format: {}",
file_path.display(),
e
)
})?;
let actual_schema = builder.schema().as_ref().clone();
let metadata = builder.metadata();
if metadata.num_row_groups() == 0 {
return Err(format!("Parquet file '{}' has no row groups", file_path.display()).into());
}
let mut reader = builder
.build()
.map_err(|e| format!("Failed to build parquet reader for '{}': {}", file_path.display(), e))?;
match reader.next() {
Some(Ok(_batch)) => {
}
Some(Err(e)) => {
return Err(format!("Failed to read data from parquet file '{}': {}", file_path.display(), e).into());
}
None => {
}
}
if let Some(expected) = expected_schema {
if actual_schema.fields().len() != expected.fields().len() {
return Err(
format!(
"Schema mismatch in parquet file '{}': expected {} fields, got {}",
file_path.display(),
expected.fields().len(),
actual_schema.fields().len()
)
.into(),
);
}
for (expected_field, actual_field) in expected.fields().iter().zip(actual_schema.fields().iter()) {
if expected_field.name() != actual_field.name() {
return Err(
format!(
"Schema field name mismatch in parquet file '{}': expected '{}', got '{}'",
file_path.display(),
expected_field.name(),
actual_field.name()
)
.into(),
);
}
if expected_field.data_type() != actual_field.data_type() {
return Err(
format!(
"Schema field type mismatch in parquet file '{}' for field '{}': expected {:?}, got {:?}",
file_path.display(),
expected_field.name(),
expected_field.data_type(),
actual_field.data_type()
)
.into(),
);
}
}
}
Ok(actual_schema)
}
fn read_parquet_file_static(file_path: &Path) -> Result<Vec<Value>, Box<dyn Error>> {
let file = fs::File::open(file_path)?;
let reader = SerializedFileReader::new(file)?;
let mut iter = reader.get_row_iter(None)?;
let mut json_records = Vec::new();
while let Some(record_result) = iter.next() {
match record_result {
Ok(record) => {
let json_record = row_to_json(&record);
json_records.push(json_record);
}
Err(_) => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"Error reading record from parquet file",
)));
}
}
}
Ok(json_records)
}
fn parquet_file_writer_locked(path: &Path, schema: Schema, array: Vec<Arc<dyn Array>>) -> Result<String, Box<dyn Error>> {
let schema_for_validation = schema.clone();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("System time is before UNIX epoch: {:?}", e))?
.as_nanos();
let temp_path = path
.parent()
.map(|p| {
p.join(format!(
"{}.{}.tmp",
path.file_name().and_then(|n| n.to_str()).unwrap_or("data.parquet"),
timestamp
))
})
.unwrap_or_else(|| path.with_file_name(format!("data.parquet.{}.tmp", timestamp)));
let cleanup_temp = |temp_path: &Path| {
if temp_path.exists() {
if let Err(e) = fs::remove_file(temp_path) {
eprintln!("Warning: Failed to clean up temp file {:?}: {}", temp_path, e);
}
}
};
let write_result = (|| -> Result<(), Box<dyn Error>> {
let temp_file = fs::File::create(&temp_path)?;
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(&temp_file, Arc::new(schema.clone()), Some(props))?;
let combined_batch = RecordBatch::try_new(Arc::new(schema), array)?;
writer.write(&combined_batch)?;
writer.close()?;
temp_file.sync_all()?;
drop(temp_file);
Ok(())
})();
if let Err(e) = write_result {
cleanup_temp(&temp_path);
return Err(e);
}
let rename_result = fs::rename(&temp_path, path);
if let Err(e) = rename_result {
cleanup_temp(&temp_path);
return Err(Box::new(e));
}
if let Some(parent) = path.parent() {
if let Ok(parent_file) = fs::File::open(parent) {
if let Err(e) = parent_file.sync_all() {
eprintln!("Warning: Failed to sync parent directory {:?} after file rename: {}", parent, e);
}
}
}
Self::validate_parquet_file(path, Some(&schema_for_validation)).map_err(|e| {
cleanup_temp(path);
format!("Parquet file validation failed after write: {}", e)
})?;
cleanup_temp(&temp_path);
Ok(format!("Data was successfully written to '{}'", path.to_string_lossy()))
}
pub fn build_files_list(&self, db_name: &str, table_name: &str, username: Option<&str>) -> Result<Vec<String>, Box<dyn Error>> {
Self::validate_name(db_name, NameType::Database).map_err(|e| e.to_string())?;
Self::validate_name(table_name, NameType::Table).map_err(|e| e.to_string())?;
let metadata = self
.get_metadata_cached_sync()
.map_err(|e| DataFusionError::Execution(format!("Failed to reload metadata: {}", e)))?;
let database = metadata
.databases
.get(db_name)
.ok_or_else(|| format!("Database '{}' does not exist.", db_name))?;
let table = database
.tables
.get(table_name)
.ok_or_else(|| format!("Table '{}' does not exist in database '{}'.", table_name, db_name))?;
let base_table_path = Path::new(&table.path);
let base_root = base_table_path
.ancestors()
.nth(2) .ok_or_else(|| format!("Failed to determine base directory from '{}'", base_table_path.display()))?
.to_path_buf();
let final_table_path = if let Some(user) = username {
base_root.join("group").join(user).join(db_name).join(table_name) } else {
base_table_path.to_path_buf() };
if !final_table_path.exists() {
return Err(format!("Table path '{}' does not exist.", final_table_path.display()).into());
}
let mut file_list = Vec::new();
self.collect_files_recursive(&final_table_path, &mut file_list)?;
file_list.sort();
Ok(file_list)
}
fn collect_files_recursive(&self, dir: &Path, file_list: &mut Vec<String>) -> Result<(), Box<dyn Error>> {
if dir.is_dir() {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
self.collect_files_recursive(&path, file_list)?;
} else if path.is_file() {
file_list.push(path.to_string_lossy().to_string());
}
}
}
Ok(())
}
fn validate_schema_structure(&self, schema: &Value) -> Result<(), Box<dyn Error>> {
let schema_obj = schema.as_object().ok_or("Schema should be a JSON object")?;
for (field_name, field_rules) in schema_obj {
let field_rules_obj = field_rules
.as_object()
.ok_or(format!("Invalid validation rules for field '{}'", field_name))?;
if !field_rules_obj.contains_key("type") {
return Err(format!("Field '{}' is missing a 'type' definition.", field_name).into());
}
if let Some(required) = field_rules_obj.get("required") {
if !required.is_boolean() {
return Err(format!("Field '{}' has an invalid 'required' value. Must be true or false.", field_name).into());
}
}
}
Ok(())
}
pub fn get_table_schema(&self, db_name: &str, table_name: &str) -> Result<serde_json::Value, Box<dyn Error>> {
Self::validate_name(db_name, NameType::Database).map_err(|e| e.to_string())?;
Self::validate_name(table_name, NameType::Table).map_err(|e| e.to_string())?;
let metadata = self.get_metadata_cached_sync().map_err(|e| format!("Failed to reload metadata: {}", e))?;
let database = metadata.databases.get(db_name).ok_or("Database not found")?;
let table = database.tables.get(table_name).ok_or("Table not found")?;
Ok(table.schema.clone())
}
fn validate_data_against_schema(&self, schema: &serde_json::Value, json_data: &serde_json::Value) -> Result<(), Box<dyn Error>> {
let schema_obj = schema.as_object().ok_or("Schema should be a JSON object")?;
let data_obj = json_data.as_object().ok_or("Data should be a JSON object")?;
for (key, _value) in data_obj {
if !schema_obj.contains_key(key) {
return Err(format!("Unexpected field: '{}' is not defined in the schema!", key).into());
}
}
for (field_name, field_rules) in schema_obj {
let field_rules_obj = field_rules
.as_object()
.ok_or(format!("Invalid validation rules for field '{}'", field_name))?;
if field_rules_obj.get("required").and_then(|v| v.as_bool()).unwrap_or(false) {
if !data_obj.contains_key(field_name) {
return Err(format!("Missing required field '{}'", field_name).into());
}
}
if let Some(value) = data_obj.get(field_name) {
let field_type = field_rules_obj.get("type").and_then(|v| v.as_str()).unwrap_or("");
self.validate_field_type(field_name, field_type, value)?;
}
}
Ok(())
}
fn validate_field_type(&self, field_name: &str, field_type: &str, value: &serde_json::Value) -> Result<(), Box<dyn Error>> {
fn get_value_type(value: &Value, expected_type: &str) -> (&'static str, &'static str) {
let raw = if value.is_f64() {
"float"
} else if value.is_i64() || value.is_u64() {
"int"
} else if value.is_string() {
"string"
} else if value.is_boolean() {
"bool"
} else if value.is_array() {
"array"
} else {
"unknown"
};
let for_check = if raw == "int" && expected_type.trim() == "float" { "float" } else { raw };
(raw, for_check)
}
let expected_type = field_type.trim();
let (actual_type, type_for_check) = get_value_type(value, expected_type);
if type_for_check != expected_type {
return Err(
format!(
"Type mismatch for field '{}': expected '{}', but got '{}'.",
field_name, field_type, actual_type
)
.into(),
);
}
Ok(())
}
async fn read_metadata(&self) -> Result<Metadata, Box<dyn Error>> {
let max_retries = METADATA_READ_MAX_RETRIES;
let retry_delay = METADATA_READ_RETRY_DELAY;
for attempt in 0..max_retries {
match tokio::fs::read_to_string(&self.metadata_path).await {
Ok(metadata_contents) => {
if metadata_contents.trim().is_empty() {
return Ok(Metadata { databases: HashMap::new() });
}
match serde_json::from_str::<Metadata>(&metadata_contents) {
Ok(metadata) => return Ok(metadata),
Err(e) => {
if attempt < max_retries - 1 {
tokio::time::sleep(retry_delay).await;
continue;
} else {
return Err(format!("Failed to parse metadata after {} attempts: {}", max_retries, e).into());
}
}
}
}
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(Metadata { databases: HashMap::new() });
}
if attempt < max_retries - 1 {
tokio::time::sleep(retry_delay).await;
continue;
} else {
return Err(format!("Failed to read metadata after {} attempts: {}", max_retries, e).into());
}
}
}
}
Ok(Metadata { databases: HashMap::new() })
}
async fn get_metadata_cached(&self) -> Result<Metadata, Box<dyn Error>> {
let should_refresh = {
let cache = self
.cache
.read()
.map_err(|e| format!("Failed to acquire read lock on cache (poisoned): {:?}", e))?;
match cache.timestamp {
Some(timestamp) => Instant::now().duration_since(timestamp) > self.cache_ttl,
None => true,
}
};
if should_refresh {
let fresh_metadata = self.read_metadata().await?;
let mut cache = self
.cache
.write()
.map_err(|e| format!("Failed to acquire write lock on cache (poisoned): {:?}", e))?;
cache.metadata = Some(fresh_metadata.clone());
cache.timestamp = Some(Instant::now());
drop(cache);
Ok(fresh_metadata)
} else {
let cache = self
.cache
.read()
.map_err(|e| format!("Failed to acquire read lock on cache (poisoned): {:?}", e))?;
match &cache.metadata {
Some(metadata) => Ok(metadata.clone()),
None => {
drop(cache);
let fresh_metadata = self.read_metadata().await?;
let mut cache = self
.cache
.write()
.map_err(|e| format!("Failed to acquire write lock on cache (poisoned): {:?}", e))?;
cache.metadata = Some(fresh_metadata.clone());
cache.timestamp = Some(Instant::now());
drop(cache);
Ok(fresh_metadata)
}
}
}
}
fn get_metadata_cached_sync(&self) -> Result<Metadata, Box<dyn Error>> {
match tokio::runtime::Handle::try_current() {
Ok(handle) => {
let self_clone = self.clone();
let join_handle = handle.spawn_blocking(move || {
let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => return Err(format!("Failed to create tokio runtime: {}", e)),
};
match rt.block_on(self_clone.get_metadata_cached()) {
Ok(metadata) => Ok(metadata),
Err(e) => Err(format!("Failed to get metadata: {}", e)),
}
});
match executor::block_on(join_handle) {
Ok(result) => result.map_err(|e| e.into()),
Err(e) => Err(format!("Failed to join blocking task: {}", e).into()),
}
}
Err(_) => {
let rt = tokio::runtime::Runtime::new().map_err(|e| format!("Failed to create tokio runtime: {}", e))?;
rt.block_on(self.get_metadata_cached())
}
}
}
fn invalidate_cache(&self) {
if let Ok(mut cache) = self.cache.write() {
cache.metadata = None;
cache.timestamp = None;
} else {
eprintln!("Warning: Failed to acquire write lock on cache for invalidation (poisoned)");
}
}
fn save_metadata(&self) -> TokioResult<()> {
match tokio::runtime::Handle::try_current() {
Ok(handle) => {
let self_clone = self.clone();
let join_handle = handle.spawn_blocking(move || {
let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to create tokio runtime: {}", e),
))
}
};
rt.block_on(self_clone.save_metadata_async())
});
match executor::block_on(join_handle) {
Ok(result) => result,
Err(e) => Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to join blocking task: {}", e),
)),
}
}
Err(_) => {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("Failed to create tokio runtime: {}", e)))?;
rt.block_on(self.save_metadata_async())
}
}
}
async fn save_metadata_async(&self) -> TokioResult<()> {
let max_retries = METADATA_SAVE_MAX_RETRIES;
let retry_delay = METADATA_SAVE_RETRY_DELAY;
for attempt in 0..max_retries {
match self.save_metadata_attempt().await {
Ok(()) => return Ok(()),
Err(e) => {
let is_transient = matches!(
e.kind(),
std::io::ErrorKind::PermissionDenied | std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut | std::io::ErrorKind::Interrupted
);
if is_transient && attempt < max_retries - 1 {
eprintln!(
"Transient error saving metadata (attempt {}/{}): {}. Retrying...",
attempt + 1,
max_retries,
e
);
tokio::time::sleep(retry_delay).await;
continue;
} else {
if attempt == max_retries - 1 {
eprintln!("Failed to save metadata after {} attempts: {}", max_retries, e);
}
return Err(e);
}
}
}
}
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to save metadata after all retry attempts",
))
}
async fn save_metadata_attempt(&self) -> TokioResult<()> {
if tokio::fs::metadata(&self.metadata_path).await.is_ok() {
if let Err(e) = self.create_metadata_backup() {
eprintln!("Warning: Failed to create metadata backup: {}. Continuing with save...", e);
}
}
let temp_path = format!("{}.tmp", self.metadata_path);
let json = serde_json::to_string(&self.metadata)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, format!("Failed to serialize metadata to JSON: {}", e)))?;
tokio::fs::write(&temp_path, json).await?;
let temp_file = tokio::fs::File::open(&temp_path).await.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to open temporary metadata file '{}' for syncing: {}", temp_path, e),
)
})?;
temp_file.sync_all().await.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to sync temporary metadata file '{}': {}", temp_path, e),
)
})?;
tokio::fs::rename(&temp_path, &self.metadata_path).await?;
if let Some(parent) = Path::new(&self.metadata_path).parent() {
let parent_file = tokio::fs::File::open(parent).await.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"Failed to open parent directory for syncing metadata file '{}': {}",
self.metadata_path, e
),
)
})?;
parent_file.sync_all().await.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to sync parent directory for metadata file '{}': {}", self.metadata_path, e),
)
})?;
}
self.invalidate_cache();
Ok(())
}
fn create_metadata_backup(&self) -> Result<(), Box<dyn Error>> {
if !Path::new(&self.metadata_path).exists() {
return Ok(()); }
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("System time is before UNIX epoch: {:?}", e))?
.as_secs();
let backup_dir = format!("{}.backups", self.metadata_path);
fs::create_dir_all(&backup_dir)?;
let backup_path = format!("{}/metadata.{}.json", backup_dir, timestamp);
fs::copy(&self.metadata_path, &backup_path)?;
self.cleanup_old_backups(&backup_dir, MAX_METADATA_BACKUPS)?;
Ok(())
}
fn cleanup_old_backups(&self, backup_dir: &str, max_backups: usize) -> Result<(), Box<dyn Error>> {
use std::time::SystemTime;
let backup_path = Path::new(backup_dir);
if !backup_path.exists() {
return Ok(());
}
let mut backups: Vec<(SystemTime, String)> = Vec::new();
for entry in fs::read_dir(backup_path)? {
let entry = entry?;
let path = entry.path();
if path.is_file() {
if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
if file_name.starts_with("metadata.") && file_name.ends_with(".json") {
if let Ok(metadata) = fs::metadata(&path) {
if let Ok(modified) = metadata.modified() {
backups.push((modified, path.to_string_lossy().to_string()));
}
}
}
}
}
}
backups.sort_by_key(|(time, _)| *time);
if backups.len() > max_backups {
let to_remove = backups.len() - max_backups;
for (_, path) in backups.iter().take(to_remove) {
if let Err(e) = fs::remove_file(path) {
eprintln!("Warning: Failed to remove old backup file '{}': {}", path, e);
}
}
}
Ok(())
}
pub fn update_metadata(&mut self, storage_path: &str) -> TokioResult<()> {
let lock_file_path = format!("{}/metadata.lock", self.storage_path);
let mut retries = 0;
let max_retries = METADATA_LOCK_MAX_RETRIES;
let retry_delay = METADATA_LOCK_RETRY_DELAY;
let _lock_file = loop {
match File::create(&lock_file_path) {
Ok(file) => {
if let Err(_) = file.lock_exclusive() {
if retries >= max_retries {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to acquire metadata lock after multiple retries",
));
}
retries += 1;
std::thread::sleep(retry_delay);
continue;
}
break file;
}
Err(_) => {
if retries >= max_retries {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to create lock file after multiple retries",
));
}
retries += 1;
std::thread::sleep(retry_delay);
}
}
};
struct LockGuard {
path: String,
}
impl Drop for LockGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.path);
}
}
let _lock_guard = LockGuard { path: lock_file_path };
let new_data_path = storage_path.to_string() + "/data";
let mut metadata = self
.get_metadata_cached_sync()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("Failed to read metadata: {}", e)))?;
for (db_name, db) in metadata.databases.iter_mut() {
for (table_name, table) in db.tables.iter_mut() {
let new_table_path = format!("{}/{}/{}", new_data_path, db_name, table_name);
table.path = new_table_path.clone();
}
}
self.metadata = metadata;
self.save_metadata()?;
Ok(())
}
pub fn get_table_path(&self, db_name: &str, table_name: &str) -> Option<String> {
if Self::validate_name(db_name, NameType::Database).is_err() || Self::validate_name(table_name, NameType::Table).is_err() {
return None;
}
self.metadata.databases.get(db_name)?.tables.get(table_name)?.path.clone().into()
}
}