use super::conf::ApplicationConf;
use super::Application;
use crate::dataflow::{
dfg::{DFGNodeKind, GlobalNodeId},
stream::Context,
};
#[derive(Default)]
pub struct ApplicationBuilder {
ctx: Context,
name: Option<String>,
debug: bool,
conf: ApplicationConf,
}
impl ApplicationBuilder {
pub(crate) fn new(ctx: Context, debug: bool) -> ApplicationBuilder {
ApplicationBuilder {
ctx,
name: None,
debug,
conf: ApplicationConf::default(),
}
}
pub fn name(&mut self, name: impl Into<String>) -> &mut Self {
self.name = Some(name.into());
self
}
pub fn config(&mut self, conf: ApplicationConf) -> &mut Self {
self.conf = conf;
self
}
pub fn build(&mut self) -> Application {
let mut app = Application::with_conf(self.conf.clone());
if self.debug {
app.with_debug_node();
}
let mut output_channels = Vec::new();
for dfg_node in self.ctx.dfg.graph.iter().rev() {
let operator_id = dfg_node.get_operator_id();
let input_channels = dfg_node.get_input_channels();
let node_ids = dfg_node
.get_node_ids()
.iter()
.map(|id| GlobalNodeId {
operator_id,
node_id: *id,
})
.collect();
match &dfg_node.kind {
DFGNodeKind::Source(source_factory) => {
let sources =
source_factory.build_source(output_channels.clone(), Vec::new(), &mut app);
app.set_source_manager(sources);
}
DFGNodeKind::Node(constructor) => {
let components = constructor.build_nodes(
node_ids,
input_channels.to_vec(),
output_channels.clone(),
Vec::new(),
&mut app,
);
output_channels = components.iter().map(|(_, c)| c.clone()).collect();
}
DFGNodeKind::Placeholder => {
panic!("Critical Error, Stream built incorrectly");
}
}
}
app
}
}