use std::{
marker::PhantomData,
sync::{Arc, Mutex},
};
use crate::{
node::operator_executors::OperatorExecutorT,
scheduler::channel_manager::{ChannelManager, StreamEndpoints, StreamEndpointsT},
OperatorConfig, OperatorId,
};
mod abstract_graph;
mod job_graph;
pub(crate) mod default_graph;
pub(crate) use abstract_graph::AbstractGraph;
pub(crate) use job_graph::JobGraph;
use serde::Deserialize;
use super::{stream::StreamId, Data};
pub(crate) trait OperatorRunner:
'static + (Fn(Arc<Mutex<ChannelManager>>) -> Box<dyn OperatorExecutorT>) + Sync + Send
{
fn box_clone(&self) -> Box<dyn OperatorRunner>;
}
impl<
T: 'static
+ (Fn(Arc<Mutex<ChannelManager>>) -> Box<dyn OperatorExecutorT>)
+ Sync
+ Send
+ Clone,
> OperatorRunner for T
{
fn box_clone(&self) -> Box<dyn OperatorRunner> {
Box::new(self.clone())
}
}
pub(crate) trait StreamSetupHook:
'static + Fn(Arc<Mutex<ChannelManager>>) + Sync + Send
{
fn box_clone(&self) -> Box<dyn StreamSetupHook>;
}
impl<T: 'static + Fn(Arc<Mutex<ChannelManager>>) + Sync + Send + Clone> StreamSetupHook for T {
fn box_clone(&self) -> Box<dyn StreamSetupHook> {
Box::new(self.clone())
}
}
#[derive(Clone, Copy)]
pub(crate) enum Job {
Operator(OperatorId),
Driver,
}
#[derive(Clone)]
pub(crate) struct AbstractStream<D>
where
for<'a> D: Data + Deserialize<'a>,
{
id: StreamId,
name: String,
phantom: PhantomData<D>,
}
impl<D> AbstractStream<D>
where
for<'a> D: Data + Deserialize<'a>,
{
fn new(id: StreamId, name: String) -> Self {
Self {
id,
name,
phantom: PhantomData,
}
}
}
pub(crate) trait AbstractStreamT: Send + Sync {
fn id(&self) -> StreamId;
fn name(&self) -> String;
fn set_name(&mut self, name: String);
fn box_clone(&self) -> Box<dyn AbstractStreamT>;
fn to_stream_endpoints_t(&self) -> Box<dyn StreamEndpointsT>;
}
impl<D> AbstractStreamT for AbstractStream<D>
where
for<'a> D: Data + Deserialize<'a>,
{
fn id(&self) -> StreamId {
self.id
}
fn name(&self) -> String {
self.name.clone()
}
fn set_name(&mut self, name: String) {
self.name = name;
}
fn box_clone(&self) -> Box<dyn AbstractStreamT> {
Box::new(self.clone())
}
fn to_stream_endpoints_t(&self) -> Box<dyn StreamEndpointsT> {
Box::new(StreamEndpoints::<D>::new(self.id))
}
}
pub(crate) struct AbstractOperator {
pub id: OperatorId,
pub runner: Box<dyn OperatorRunner>,
pub config: OperatorConfig,
pub read_streams: Vec<StreamId>,
pub write_streams: Vec<StreamId>,
}
impl Clone for AbstractOperator {
fn clone(&self) -> Self {
Self {
id: self.id,
runner: self.runner.box_clone(),
config: self.config.clone(),
read_streams: self.read_streams.clone(),
write_streams: self.write_streams.clone(),
}
}
}