use std::{
fmt::Debug,
marker::PhantomData,
};
use anyhow::Context;
use serde::de::DeserializeOwned;
use crate::core::{
operations::{
operator::FirehoseOperator,
signature::FirehoseOperatorSignature,
},
schema::{
BuildPlan,
FirehoseTableSchema,
},
};
pub trait FirehoseOperatorFactory: Debug + Send + Sync {
fn operator_id(&self) -> &String {
self.signature()
.operator_id
.as_ref()
.expect("Spec must have an operator id")
}
fn signature(&self) -> &FirehoseOperatorSignature;
fn init(
&self,
context: &dyn FirehoseOperatorInitContext,
) -> anyhow::Result<Box<dyn FirehoseOperator>>;
}
pub trait FirehoseOperatorInitContext {
fn operator_id(&self) -> &str;
fn table_schema(&self) -> &FirehoseTableSchema;
fn build_plan(&self) -> &BuildPlan;
fn signature(&self) -> &FirehoseOperatorSignature;
}
#[derive(Debug)]
pub struct SimpleConfigOperatorFactory<T>
where
T: DeserializeOwned + FirehoseOperator,
{
signature: FirehoseOperatorSignature,
phantom_data: PhantomData<T>,
}
impl<T> SimpleConfigOperatorFactory<T>
where
T: DeserializeOwned + FirehoseOperator,
{
pub fn new(spec: FirehoseOperatorSignature) -> Self {
if spec.operator_id.is_none() {
panic!("OperatorSpec must have an operator_id");
}
Self {
signature: spec,
phantom_data: PhantomData,
}
}
}
impl<T> FirehoseOperatorFactory for SimpleConfigOperatorFactory<T>
where
T: DeserializeOwned + FirehoseOperator,
{
fn signature(&self) -> &FirehoseOperatorSignature {
&self.signature
}
fn init(
&self,
context: &dyn FirehoseOperatorInitContext,
) -> anyhow::Result<Box<dyn FirehoseOperator>> {
let config = &context.build_plan().config;
let op: T = serde_json::from_value(config.clone()).with_context(|| {
format!(
"Failed to deserialize operator config for {}: {}",
self.signature.operator_id.as_deref().unwrap_or("unknown"),
serde_json::to_string_pretty(config).unwrap()
)
})?;
Ok(Box::new(op))
}
}
#[cfg(test)]
mod tests {
use serde::{
Deserialize,
Serialize,
};
use crate::{
core::{
operations::{
factory::SimpleConfigOperatorFactory,
operator::FirehoseOperator,
signature::{
FirehoseOperatorSignature,
ParameterSpec,
},
},
rows::FirehoseRowTransaction,
},
define_firehose_operator_id,
};
define_firehose_operator_id!(TEST_OP);
#[derive(Debug, Serialize, Deserialize)]
struct TestOperator {
pub value: i32,
}
impl FirehoseOperator for TestOperator {
fn apply_to_row(
&self,
_row: &mut FirehoseRowTransaction,
) -> anyhow::Result<()> {
todo!()
}
}
#[test]
fn test_simple_config_operator_factory() {
let signature = FirehoseOperatorSignature::from_operator_id(TEST_OP)
.with_input(ParameterSpec::new::<i32>("input"))
.with_output(ParameterSpec::new::<i32>("output"));
let _factory = SimpleConfigOperatorFactory::<TestOperator>::new(signature);
}
#[should_panic(expected = "OperatorSpec must have an operator_id")]
#[test]
fn test_simple_config_operator_factory_without_id() {
let signature = FirehoseOperatorSignature::default()
.with_input(ParameterSpec::new::<i32>("input"))
.with_output(ParameterSpec::new::<i32>("output"));
let _factory = SimpleConfigOperatorFactory::<TestOperator>::new(signature);
}
}