use std::{
fmt::Debug,
sync::Arc,
};
use serde::{
Deserialize,
Serialize,
};
use crate::core::{
operations::{
environment::{
BuildPlanContext,
FirehoseOperatorEnvironment,
},
signature::FirehoseOperatorSignature,
},
rows::{
FirehoseBatchTransaction,
FirehoseRowBatch,
FirehoseRowTransaction,
},
schema::{
BuildPlan,
FirehoseTableSchema,
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct OperatorSchedulingMetadata {
pub effective_batch_size: usize,
}
pub trait FirehoseOperator: 'static + Send + Sync + Debug {
fn scheduling_metadata(&self) -> OperatorSchedulingMetadata {
OperatorSchedulingMetadata {
effective_batch_size: 1,
}
}
#[must_use]
fn apply_to_batch(
&self,
txn: &mut FirehoseBatchTransaction,
) -> anyhow::Result<()> {
for index in 0..txn.len() {
let mut row_txn = txn.mut_row_transaction(index);
self.apply_to_row(&mut row_txn)?;
}
Ok(())
}
#[must_use]
fn apply_to_row(
&self,
txn: &mut FirehoseRowTransaction,
) -> anyhow::Result<()>;
}
pub struct OperationRunner {
pub table_schema: Arc<FirehoseTableSchema>,
pub build_plan: Arc<BuildPlan>,
pub signature: Arc<FirehoseOperatorSignature>,
operator: Box<dyn FirehoseOperator>,
}
impl Debug for OperationRunner {
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
if !f.alternate() {
f.debug_struct("ColumnBuilder")
.field("id", &self.build_plan.operator_id)
.field("inputs", &self.build_plan.inputs)
.field("outputs", &self.build_plan.outputs)
.finish()
} else {
f.debug_struct("ColumnBuilder")
.field("build_plan", &self.build_plan)
.finish()
}
}
}
impl OperationRunner {
#[must_use]
pub fn new_for_plan(
table_schema: Arc<FirehoseTableSchema>,
build_plan: Arc<BuildPlan>,
env: &dyn FirehoseOperatorEnvironment,
) -> anyhow::Result<OperationRunner> {
let table_schema = table_schema.clone();
let build_plan = build_plan.clone();
let build_plan_context = BuildPlanContext::new(table_schema.clone(), build_plan.clone());
let signature = Arc::new(
env.lookup_operator_factory(build_plan_context.operator_id())?
.signature()
.clone(),
);
let operator = env.init_operator(build_plan_context)?;
Ok(OperationRunner {
table_schema,
build_plan,
operator,
signature,
})
}
pub fn scheduling_metadata(&self) -> OperatorSchedulingMetadata {
self.operator.scheduling_metadata()
}
pub fn apply_to_batch(
&self,
batch: &mut FirehoseRowBatch,
) -> anyhow::Result<()> {
let mut txn =
FirehoseBatchTransaction::new(batch, self.build_plan.clone(), self.signature.clone());
self.operator.apply_to_batch(&mut txn)?;
txn.commit()
}
}
#[cfg(test)]
mod tests {
use super::*;
const OPERATION_RUNNER_IS_SEND: fn() = || {
fn assert_send<T: Send>() {}
assert_send::<OperationRunner>();
};
#[test]
fn test_operation_runner_is_send() {
OPERATION_RUNNER_IS_SEND();
}
}