1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use crate::dataflow::graph::{Channel, Graph, Vertex};
pub mod channel_manager;
pub mod endpoints_manager;
pub fn schedule(graph: &Graph) -> Graph {
let mut scheduled_graph = graph.clone();
for stream in scheduled_graph.get_streams_ref_mut() {
let source_node_id = match stream.get_source() {
Vertex::Driver(node_id) => node_id,
Vertex::Operator(operator_id) => graph.get_operator(operator_id).unwrap().node_id,
};
let mut channels = Vec::new();
for channel in stream.get_channels() {
channels.push(match channel {
Channel::Unscheduled(cm) => {
let sink_node_id = match cm.sink {
Vertex::Driver(node_id) => node_id,
Vertex::Operator(operator_id) => {
graph.get_operator(operator_id).unwrap().node_id
}
};
if source_node_id == sink_node_id {
Channel::InterThread(cm)
} else {
Channel::InterNode(cm)
}
}
c => c,
});
}
stream.set_channels(channels);
}
scheduled_graph
}