use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
use crate::expr::{Bind, BoundPredicate};
use crate::spec::{Schema, TableMetadataRef};
use crate::{Error, ErrorKind, Result};
#[derive(Debug)]
pub(crate) struct PartitionFilterCache(RwLock<HashMap<i32, Arc<BoundPredicate>>>);
impl PartitionFilterCache {
pub(crate) fn new() -> Self {
Self(RwLock::new(HashMap::new()))
}
pub(crate) fn get(
&self,
spec_id: i32,
table_metadata: &TableMetadataRef,
schema: &Schema,
case_sensitive: bool,
filter: BoundPredicate,
) -> Result<Arc<BoundPredicate>> {
{
let read = self.0.read().map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"PartitionFilterCache RwLock was poisoned",
)
})?;
if read.contains_key(&spec_id) {
return Ok(read.get(&spec_id).unwrap().clone());
}
}
let partition_spec = table_metadata
.partition_spec_by_id(spec_id)
.ok_or(Error::new(
ErrorKind::Unexpected,
format!("Could not find partition spec for id {spec_id}"),
))?;
let partition_type = partition_spec.partition_type(schema)?;
let partition_fields = partition_type.fields().to_owned();
let partition_schema = Arc::new(
Schema::builder()
.with_schema_id(partition_spec.spec_id())
.with_fields(partition_fields)
.build()?,
);
let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone());
let partition_filter = inclusive_projection
.project(&filter)?
.rewrite_not()
.bind(partition_schema.clone(), case_sensitive)?;
self.0
.write()
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"PartitionFilterCache RwLock was poisoned",
)
})?
.insert(spec_id, Arc::new(partition_filter));
let read = self.0.read().map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"PartitionFilterCache RwLock was poisoned",
)
})?;
Ok(read.get(&spec_id).unwrap().clone())
}
}
#[derive(Debug)]
pub(crate) struct ManifestEvaluatorCache(RwLock<HashMap<i32, Arc<ManifestEvaluator>>>);
impl ManifestEvaluatorCache {
pub(crate) fn new() -> Self {
Self(RwLock::new(HashMap::new()))
}
pub(crate) fn get(
&self,
spec_id: i32,
partition_filter: Arc<BoundPredicate>,
) -> Arc<ManifestEvaluator> {
{
let read = self
.0
.read()
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"ManifestEvaluatorCache RwLock was poisoned",
)
})
.unwrap();
if read.contains_key(&spec_id) {
return read.get(&spec_id).unwrap().clone();
}
}
self.0
.write()
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"ManifestEvaluatorCache RwLock was poisoned",
)
})
.unwrap()
.insert(
spec_id,
Arc::new(ManifestEvaluator::builder(partition_filter.as_ref().clone()).build()),
);
let read = self
.0
.read()
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"ManifestEvaluatorCache RwLock was poisoned",
)
})
.unwrap();
read.get(&spec_id).unwrap().clone()
}
}
#[derive(Debug)]
pub(crate) struct ExpressionEvaluatorCache(RwLock<HashMap<i32, Arc<ExpressionEvaluator>>>);
impl ExpressionEvaluatorCache {
pub(crate) fn new() -> Self {
Self(RwLock::new(HashMap::new()))
}
pub(crate) fn get(
&self,
spec_id: i32,
partition_filter: &BoundPredicate,
) -> Result<Arc<ExpressionEvaluator>> {
{
let read = self.0.read().map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"PartitionFilterCache RwLock was poisoned",
)
})?;
if read.contains_key(&spec_id) {
return Ok(read.get(&spec_id).unwrap().clone());
}
}
self.0
.write()
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"ManifestEvaluatorCache RwLock was poisoned",
)
})
.unwrap()
.insert(
spec_id,
Arc::new(ExpressionEvaluator::new(partition_filter.clone())),
);
let read = self
.0
.read()
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"ManifestEvaluatorCache RwLock was poisoned",
)
})
.unwrap();
Ok(read.get(&spec_id).unwrap().clone())
}
}