1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use std::cell::RefCell;
use std::rc::Rc;
use crate::api::data_stream::{DataStream, StreamBuilder};
use crate::api::function::InputFormat;
use crate::api::operator::StreamOperator;
use crate::api::properties::Properties;
use crate::api::runtime::OperatorId;
use crate::dag::RawStreamGraph;
use crate::runtime;
pub trait StreamApp: Send + Sync + Clone {
fn prepare_properties(&self, properties: &mut Properties);
fn build_stream(&self, properties: &Properties, env: &mut StreamExecutionEnvironment);
}
#[derive(Debug)]
pub struct StreamExecutionEnvironment {
pub(crate) application_name: String,
pub(crate) stream_manager: Rc<StreamManager>,
}
impl StreamExecutionEnvironment {
pub(crate) fn new(application_name: String) -> Self {
StreamExecutionEnvironment {
application_name: application_name.clone(),
stream_manager: Rc::new(StreamManager::new(application_name)),
}
}
pub fn register_source<I>(&mut self, input_format: I, parallelism: u16) -> DataStream
where
I: InputFormat + 'static,
{
let stream_builder = StreamBuilder::with_source(
self.stream_manager.clone(),
Box::new(input_format),
parallelism,
);
DataStream::new(stream_builder)
}
}
pub fn execute<S>(application_name: &str, stream_app: S)
where
S: StreamApp + 'static,
{
let stream_env = StreamExecutionEnvironment::new(application_name.to_string());
match runtime::run(stream_env, stream_app) {
Ok(_) => {}
Err(e) => {
panic!(
"force panic when catch error in job startup process. msg: {}",
e
);
}
}
}
#[derive(Debug)]
pub(crate) struct StreamManager {
pub(crate) stream_graph: RefCell<RawStreamGraph>,
}
impl StreamManager {
pub fn new(application_name: String) -> Self {
StreamManager {
stream_graph: RefCell::new(RawStreamGraph::new(application_name)),
}
}
pub fn add_operator(
&self,
operator: StreamOperator,
parent_operator_ids: Vec<OperatorId>,
) -> OperatorId {
self.stream_graph
.borrow_mut()
.add_operator(operator, parent_operator_ids)
.expect("add operator error")
}
}