use serde::Deserialize;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
self,
sync::{broadcast, mpsc},
};
use crate::{
dataflow::{
operator::{OperatorConfig, Source},
stream::WriteStreamT,
Data, Message, Timestamp, WriteStream,
},
node::{
lattice::ExecutionLattice,
operator_executors::{OperatorExecutorHelper, OperatorExecutorT},
worker::{EventNotification, OperatorExecutorNotification, WorkerNotification},
},
OperatorId,
};
pub struct SourceExecutor<O, T>
where
O: Source<T>,
T: Data + for<'a> Deserialize<'a>,
{
config: OperatorConfig,
operator: O,
write_stream: WriteStream<T>,
helper: OperatorExecutorHelper,
}
impl<O, T> SourceExecutor<O, T>
where
O: Source<T>,
T: Data + for<'a> Deserialize<'a>,
{
pub fn new(
config: OperatorConfig,
operator_fn: impl Fn() -> O + Send,
write_stream: WriteStream<T>,
) -> Self {
let operator_id = config.id;
Self {
config,
operator: operator_fn(),
write_stream,
helper: OperatorExecutorHelper::new(operator_id),
}
}
pub(crate) async fn execute(
&mut self,
_channel_from_worker: broadcast::Receiver<OperatorExecutorNotification>,
channel_to_worker: mpsc::UnboundedSender<WorkerNotification>,
_channel_to_event_runners: broadcast::Sender<EventNotification>,
) {
self.helper.synchronize().await;
tracing::debug!(
"Node {}: running operator {}",
self.config.node_id,
self.config.get_name()
);
tokio::task::block_in_place(|| self.operator.run(&self.config, &mut self.write_stream));
tokio::task::block_in_place(|| self.operator.destroy());
if !self.write_stream.is_closed() {
self.write_stream
.send(Message::new_watermark(Timestamp::Top))
.unwrap();
}
channel_to_worker
.send(WorkerNotification::DestroyedOperator(self.operator_id()))
.unwrap();
}
}
impl<O, T> OperatorExecutorT for SourceExecutor<O, T>
where
O: Source<T>,
T: Data + for<'a> Deserialize<'a>,
{
fn execute<'a>(
&'a mut self,
channel_from_worker: broadcast::Receiver<OperatorExecutorNotification>,
channel_to_worker: mpsc::UnboundedSender<WorkerNotification>,
channel_to_event_runners: broadcast::Sender<EventNotification>,
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>> {
Box::pin(self.execute(
channel_from_worker,
channel_to_worker,
channel_to_event_runners,
))
}
fn lattice(&self) -> Arc<ExecutionLattice> {
Arc::clone(&self.helper.lattice)
}
fn operator_id(&self) -> OperatorId {
self.config.id
}
}