use std::{
collections::BTreeMap,
fmt::Debug,
sync::Arc,
};
use anyhow::{
Context,
bail,
};
use crate::core::{
operations::{
factory::{
FirehoseOperatorFactory,
FirehoseOperatorInitContext,
},
operator::FirehoseOperator,
planner::OperationPlan,
signature::FirehoseOperatorSignature,
},
schema::{
BuildPlan,
DataTypeDescription,
FirehoseTableSchema,
},
};
pub trait FirehoseOperatorEnvironment: Debug + Send + Sync {
fn operators(&self) -> &BTreeMap<String, Arc<dyn FirehoseOperatorFactory>>;
fn lookup_operator_factory(
&self,
operator_id: &str,
) -> anyhow::Result<Arc<dyn FirehoseOperatorFactory>> {
Ok(self
.operators()
.get(operator_id)
.with_context(|| format!("Operator '{operator_id}' not found in environment."))?
.clone())
}
fn validate_context(
&self,
plan_context: BuildPlanContext,
) -> anyhow::Result<()> {
self.init_operator(plan_context).map(|_| ())
}
fn init_operator(
&self,
plan_context: BuildPlanContext,
) -> anyhow::Result<Box<dyn FirehoseOperator>> {
let factory = self.lookup_operator_factory(plan_context.operator_id())?;
let context = plan_context.bind_signature(factory.signature())?;
factory.init(&context)
}
fn apply_plan_to_schema(
&self,
schema: &mut FirehoseTableSchema,
planner: OperationPlan,
) -> anyhow::Result<BuildPlan> {
let operator_id = &planner.operator_id;
let factory = self.lookup_operator_factory(operator_id)?;
let signature = factory.signature();
let (plan, output_cols) = planner.plan_for_signature(signature)?;
{
let mut tmp_schema = schema.clone();
tmp_schema.extend_via_plan(plan.clone(), &output_cols)?;
let builder = BuildPlanContext::new(Arc::new(tmp_schema), Arc::new(plan.clone()));
self.validate_context(builder)?;
}
schema.extend_via_plan(plan.clone(), &output_cols)?;
Ok(plan)
}
}
#[derive(Debug)]
pub struct MapOpEnvironment {
operators: BTreeMap<String, Arc<dyn FirehoseOperatorFactory>>,
}
impl Default for MapOpEnvironment {
fn default() -> Self {
Self::new()
}
}
impl MapOpEnvironment {
pub fn new() -> Self {
Self {
operators: BTreeMap::new(),
}
}
pub fn from_operators(
factories: Vec<Arc<dyn FirehoseOperatorFactory>>
) -> anyhow::Result<Self> {
let mut this = Self::new();
this.add_all_operators(factories)?;
Ok(this)
}
pub fn add_operator(
&mut self,
factory: Arc<dyn FirehoseOperatorFactory>,
) -> anyhow::Result<()> {
let id = factory.operator_id();
if self.operators.contains_key(id) {
bail!("Operator with ID '{id}' already exists in MapOpEnvironment.");
}
self.operators.insert(id.clone(), factory);
Ok(())
}
pub fn add_all_operators(
&mut self,
factories: Vec<Arc<dyn FirehoseOperatorFactory>>,
) -> anyhow::Result<()> {
for binding in factories.into_iter() {
self.add_operator(binding.clone())?;
}
Ok(())
}
}
impl FirehoseOperatorEnvironment for MapOpEnvironment {
fn operators(&self) -> &BTreeMap<String, Arc<dyn FirehoseOperatorFactory>> {
&self.operators
}
}
#[derive(Debug, Clone)]
pub struct BuildPlanContext {
table_schema: Arc<FirehoseTableSchema>,
build_plan: Arc<BuildPlan>,
}
impl BuildPlanContext {
pub fn new(
table_schema: Arc<FirehoseTableSchema>,
build_plan: Arc<BuildPlan>,
) -> Self {
Self {
table_schema,
build_plan,
}
}
pub fn operator_id(&self) -> &str {
&self.build_plan().operator_id
}
pub fn table_schema(&self) -> &FirehoseTableSchema {
&self.table_schema
}
pub fn build_plan(&self) -> &BuildPlan {
&self.build_plan
}
pub fn bind_signature(
self,
signature: &FirehoseOperatorSignature,
) -> anyhow::Result<OperationInitializationContext> {
OperationInitializationContext::init(self, signature.clone())
}
pub fn input_types(&self) -> BTreeMap<String, DataTypeDescription> {
self.build_plan
.inputs
.iter()
.map(|(pname, cname)| {
(
pname.clone(),
self.table_schema[cname.as_ref()].data_type.clone(),
)
})
.collect()
}
pub fn output_types(&self) -> BTreeMap<String, DataTypeDescription> {
self.build_plan
.outputs
.iter()
.map(|(pname, cname)| {
(
pname.clone(),
self.table_schema[cname.as_ref()].data_type.clone(),
)
})
.collect()
}
}
#[derive(Debug, Clone)]
pub struct OperationInitializationContext {
plan_context: BuildPlanContext,
signature: FirehoseOperatorSignature,
}
impl FirehoseOperatorInitContext for OperationInitializationContext {
fn operator_id(&self) -> &str {
&self.build_plan().operator_id
}
fn table_schema(&self) -> &FirehoseTableSchema {
&self.plan_context.table_schema
}
fn build_plan(&self) -> &BuildPlan {
self.plan_context().build_plan()
}
fn signature(&self) -> &FirehoseOperatorSignature {
&self.signature
}
}
impl OperationInitializationContext {
pub fn init(
plan_context: BuildPlanContext,
signature: FirehoseOperatorSignature,
) -> anyhow::Result<Self> {
signature.validate(&plan_context.input_types(), &plan_context.output_types())?;
Ok(Self {
plan_context,
signature,
})
}
pub fn plan_context(&self) -> &BuildPlanContext {
&self.plan_context
}
pub fn table_schema(&self) -> &FirehoseTableSchema {
self.plan_context().table_schema()
}
pub fn signature(&self) -> &FirehoseOperatorSignature {
&self.signature
}
}