use crate::io::FileIO;
use crate::spec::TableSchema;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
const SCHEMA_DIR: &str = "schema";
const SCHEMA_PREFIX: &str = "schema-";
#[derive(Debug, Clone)]
pub struct SchemaManager {
file_io: FileIO,
table_path: String,
cache: Arc<Mutex<HashMap<i64, Arc<TableSchema>>>>,
}
impl SchemaManager {
pub fn new(file_io: FileIO, table_path: String) -> Self {
Self {
file_io,
table_path,
cache: Arc::new(Mutex::new(HashMap::new())),
}
}
fn schema_directory(&self) -> String {
format!("{}/{}", self.table_path.trim_end_matches('/'), SCHEMA_DIR)
}
fn schema_path(&self, schema_id: i64) -> String {
format!("{}/{}{}", self.schema_directory(), SCHEMA_PREFIX, schema_id)
}
pub async fn schema(&self, schema_id: i64) -> crate::Result<Arc<TableSchema>> {
{
let cache = self.cache.lock().unwrap();
if let Some(schema) = cache.get(&schema_id) {
return Ok(schema.clone());
}
}
let path = self.schema_path(schema_id);
let input = self.file_io.new_input(&path)?;
let bytes = input.read().await?;
let schema: TableSchema =
serde_json::from_slice(&bytes).map_err(|e| crate::Error::DataInvalid {
message: format!("Failed to parse schema file: {path}"),
source: Some(Box::new(e)),
})?;
let schema = Arc::new(schema);
{
let mut cache = self.cache.lock().unwrap();
cache.entry(schema_id).or_insert_with(|| schema.clone());
}
Ok(schema)
}
}